This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ad64e72cf3eaede9a4db6313e72f22340a133b00
Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com>
AuthorDate: Thu Jun 22 20:06:10 2023 +0200

    [FLINK-28050][connectors] Introduce FLIP-27 alternative to 
StreamExecutionEnvironment#fromElements()
---
 .../e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5           |   8 +-
 .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e           |  87 +++++----
 .../flink-architecture-tests-production/pom.xml    |   5 +
 flink-architecture-tests/pom.xml                   |   7 +
 .../6dae736c-6957-4d04-93bf-d7ebc5ca97ab           |   0
 .../f2b2daa5-8ca8-4a74-bb17-cb9f49e3e0e2           |   0
 .../archunit-violations/stored.rules               |   4 +
 .../flink-connector-datagen-test/pom.xml           |  49 ++++-
 .../FromElementsGeneratorSourceITCase.java         | 122 +++++++++++++
 .../datagen/source/DataGeneratorSourceITCase.java  |  56 +++++-
 .../src/test/resources/avro/user.avsc              |   9 +
 .../functions/FromElementsGeneratorFunction.java   | 200 +++++++++++++++++++++
 .../datagen/source/DataGeneratorSource.java        |  35 +++-
 .../api/operators}/OutputTypeConfigurable.java     |   8 +-
 ...st_stream_execution_environment_completeness.py |   2 +-
 flink-streaming-java/pom.xml                       |   6 +
 .../streaming/api/datastream/DataStreamSource.java |   2 +-
 .../environment/StreamExecutionEnvironment.java    | 110 ++++++++++++
 .../api/operators/OutputTypeConfigurable.java      |  49 -----
 .../api/operators/SimpleOperatorFactory.java       |  13 +-
 .../api/operators/SourceOperatorFactory.java       |   9 +-
 .../apache/flink/streaming/api/DataStreamTest.java |   7 -
 .../StreamExecutionEnvironmentTest.java            |  57 ++++--
 .../api/graph/StreamGraphGeneratorTest.java        |  19 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  |  16 +-
 .../MultipleInputNodeCreationProcessorTest.java    |   4 +-
 flink-tests/pom.xml                                |  45 ++++-
 .../completeness/TypeInfoTestCoverageTest.java     |   6 +-
 .../TypeSerializerTestCoverageTest.java            |   7 +-
 .../api/StreamExecutionEnvironmentITCase.java      |  87 +++++++--
 flink-tests/src/test/resources/avro/user.avsc      |   9 +
 pom.xml                                            |   3 +
 32 files changed, 871 insertions(+), 170 deletions(-)

diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5
index 1fea7816ed1..dbca0076986 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5
@@ -17,10 +17,10 @@ Method 
<org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServi
 Method 
<org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.revokeResourceManagerLeadership()>
 calls method 
<org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.revokeLeadership()>
 in (EmbeddedHaServicesWithLeadershipControl.java:102)
 Method 
<org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerImpl.listDataSets()>
 calls method 
<org.apache.flink.runtime.io.network.partition.DataSetMetaInfo.withNumRegisteredPartitions(int,
 int)> in (ResourceManagerPartitionTrackerImpl.java:286)
 Method 
<org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.closeAsync(long)>
 calls method 
<org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.quiesce()>
 in (RecreateOnResetOperatorCoordinator.java:361)
-Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfiguration(org.apache.flink.configuration.Configuration,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, 
java.lang.String, java.io.File)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec,
 int)> in (TaskManagerConfiguration.java:244)
-Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfiguration(org.apache.flink.configuration.Configuration,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, 
java.lang.String, java.io.File)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)>
 in (TaskManagerConfiguration.java:246)
-Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, 
java.util.concurrent.Executor)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec,
 int)> in (TaskManagerServices.java:433)
-Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, 
java.util.concurrent.Executor)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)>
 in (TaskManagerServices.java:431)
+Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfiguration(org.apache.flink.configuration.Configuration,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, 
java.lang.String, java.io.File)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec,
 int)> in (TaskManagerConfiguration.java:241)
+Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfiguration(org.apache.flink.configuration.Configuration,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, 
java.lang.String, java.io.File)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)>
 in (TaskManagerConfiguration.java:243)
+Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, 
java.util.concurrent.Executor)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec,
 int)> in (TaskManagerServices.java:450)
+Method 
<org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int,
 org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, 
java.util.concurrent.Executor)> calls method 
<org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)>
 in (TaskManagerServices.java:448)
 Method 
<org.apache.flink.streaming.api.operators.SourceOperator$1$1.asClassLoader()> 
calls method 
<org.apache.flink.streaming.api.operators.SourceOperator.getRuntimeContext()> 
in (SourceOperator.java:295)
 Method 
<org.apache.flink.streaming.api.operators.SourceOperator$1$1.registerReleaseHookIfAbsent(java.lang.String,
 java.lang.Runnable)> calls method 
<org.apache.flink.streaming.api.operators.SourceOperator.getRuntimeContext()> 
in (SourceOperator.java:301)
 Method 
<org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init()> 
calls method 
<org.apache.flink.streaming.api.operators.SourceOperator.getSourceReader()> in 
(SourceOperatorStreamTask.java:95)
diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
index 0c69f24bc8b..95c466ee545 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
@@ -64,6 +64,18 @@ Constructor 
<org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.
 Constructor 
<org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.<init>(int, 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue,
 org.apache.flink.connector.base.source.reader.splitreader.SplitReader, 
java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer, 
boolean)> calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in 
(SplitFetcher.java:97)
 Constructor 
<org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.<init>(org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue,
 java.util.function.Supplier, org.apache.flink.configuration.Configuration, 
java.util.function.Consumer)> is annotated with 
<org.apache.flink.annotation.VisibleForTesting> in (SplitFetcherManager.java:0)
 Constructor 
<org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.<init>(int)>
 calls method <org.apache.flink.util.Preconditions.checkArgument(boolean, 
java.lang.Object)> in (FutureCompletingBlockingQueue.java:114)
+Constructor 
<org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.api.connector.source.SourceReaderFactory,
 org.apache.flink.connector.datagen.source.GeneratorFunction, long, 
org.apache.flink.api.common.typeinfo.TypeInformation)> calls method 
<org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, 
org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in 
(DataGeneratorSource.java:154)
+Constructor 
<org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.api.connector.source.SourceReaderFactory,
 org.apache.flink.connector.datagen.source.GeneratorFunction, long, 
org.apache.flink.api.common.typeinfo.TypeInformation)> calls method 
<org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, 
org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in 
(DataGeneratorSource.java:156)
+Constructor 
<org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.api.connector.source.SourceReaderFactory,
 org.apache.flink.connector.datagen.source.GeneratorFunction, long, 
org.apache.flink.api.common.typeinfo.TypeInformation)> calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in 
(DataGeneratorSource.java:150)
+Constructor 
<org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.api.connector.source.SourceReaderFactory,
 org.apache.flink.connector.datagen.source.GeneratorFunction, long, 
org.apache.flink.api.common.typeinfo.TypeInformation)> calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in 
(DataGeneratorSource.java:151)
+Constructor 
<org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.api.connector.source.SourceReaderFactory,
 org.apache.flink.connector.datagen.source.GeneratorFunction, long, 
org.apache.flink.api.common.typeinfo.TypeInformation)> calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in 
(DataGeneratorSource.java:152)
+Constructor 
<org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction,
 long, org.apache.flink.api.common.typeinfo.TypeInformation)> calls method 
<org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy.noOp()>
 in (DataGeneratorSource.java:120)
+Constructor 
<org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction,
 long, 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy, 
org.apache.flink.api.common.typeinfo.TypeInformation)> calls method 
<org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, 
org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in 
(DataGeneratorSource.java:141)
+Constructor 
<org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction,
 long, 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy, 
org.apache.flink.api.common.typeinfo.TypeInformation)> has parameter of type 
<org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy> in 
(DataGeneratorSource.java:0)
+Constructor 
<org.apache.flink.connector.datagen.source.GeneratingIteratorSourceReader.<init>(org.apache.flink.api.connector.source.SourceReaderContext,
 org.apache.flink.connector.datagen.source.GeneratorFunction)> calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in 
(GeneratingIteratorSourceReader.java:46)
+Constructor 
<org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction,
 org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in 
(GeneratorSourceReaderFactory.java:54)
+Constructor 
<org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction,
 org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in 
(GeneratorSourceReaderFactory.java:55)
+Constructor 
<org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction,
 org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> has 
parameter of type 
<org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy> in 
(GeneratorSourceReaderFactory.java:0)
 Constructor 
<org.apache.flink.connector.datagen.table.DataGenTableSource.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
 java.lang.String, org.apache.flink.table.types.DataType, long, 
java.lang.Long)> depends on component type 
<org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in 
(DataGenTableSource.java:0)
 Constructor 
<org.apache.flink.connector.datagen.table.DataGenTableSource.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
 java.lang.String, org.apache.flink.table.types.DataType, long, 
java.lang.Long)> has parameter of type 
<[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in 
(DataGenTableSource.java:0)
 Constructor 
<org.apache.flink.connector.datagen.table.DataGenVisitorBase.<init>(java.lang.String,
 org.apache.flink.configuration.ReadableConfig)> calls constructor 
<org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.<init>()> 
in (DataGenVisitorBase.java:49)
@@ -213,6 +225,7 @@ Constructor 
<org.apache.flink.connector.file.table.stream.compact.CompactBucketW
 Constructor 
<org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.<init>(org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter,
 org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter, 
org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter$1)> 
has parameter of type 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter> in 
(CompactBucketWriter.java:0)
 Constructor 
<org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.<init>(org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter,
 org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter, 
org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter$1)> 
has parameter of type 
<org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter> 
in (CompactBucketWriter.java:0)
 Constructor 
<org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.<init>(org.apache.flink.table.connector.sink.DynamicTableSink$DataStructureConverter,
 java.lang.String, boolean)> calls constructor 
<org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.<init>(java.lang.String,
 boolean)> in (PrintTableSinkFactory.java:173)
+Field 
<org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.rateLimiterStrategy>
 has type 
<org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy> in 
(GeneratorSourceReaderFactory.java:0)
 Field 
<org.apache.flink.connector.datagen.table.DataGenTableSource.fieldGenerators> 
depends on component type 
<org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in 
(DataGenTableSource.java:0)
 Field 
<org.apache.flink.connector.datagen.table.DataGenTableSource.fieldGenerators> 
has type 
<[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in 
(DataGenTableSource.java:0)
 Field 
<org.apache.flink.connector.datagen.table.DataGeneratorContainer.generator> has 
type <org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in 
(DataGeneratorContainer.java:0)
@@ -274,16 +287,22 @@ Method 
<org.apache.flink.connector.base.source.hybrid.HybridSource$HybridSourceB
 Method 
<org.apache.flink.connector.base.source.hybrid.HybridSource$HybridSourceBuilder.addSource(org.apache.flink.connector.base.source.hybrid.HybridSource$SourceFactory,
 org.apache.flink.api.connector.source.Boundedness)> calls method 
<org.apache.flink.util.Preconditions.checkArgument(boolean, java.lang.Object)> 
in (HybridSource.java:242)
 Method 
<org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(java.util.List)>
 calls method <org.apache.flink.util.Preconditions.checkState(boolean, 
java.lang.String, [Ljava.lang.Object;)> in (HybridSourceReader.java:153)
 Method 
<org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(int)>
 calls method <org.apache.flink.util.Preconditions.checkArgument(boolean)> in 
(HybridSourceReader.java:206)
-Method 
<org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext,
 int, int, int)> calls method 
<org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits.signalIntermediateNoMoreSplits(int)>
 in (HybridSourceSplitEnumerator.java:446)
-Method 
<org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext,
 int, int, int)> calls method 
<org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, 
[Ljava.lang.Object;)> in (HybridSourceSplitEnumerator.java:438)
-Method 
<org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext,
 int, int, int)> checks instanceof 
<org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits> in 
(HybridSourceSplitEnumerator.java:438)
+Method 
<org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext,
 int, int, int)> calls method 
<org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits.signalIntermediateNoMoreSplits(int)>
 in (HybridSourceSplitEnumerator.java:447)
+Method 
<org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext,
 int, int, int)> calls method 
<org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, 
[Ljava.lang.Object;)> in (HybridSourceSplitEnumerator.java:439)
+Method 
<org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext,
 int, int, int)> checks instanceof 
<org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits> in 
(HybridSourceSplitEnumerator.java:439)
 Method 
<org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.handleSplitRequest(int,
 java.lang.String)> calls method 
<org.apache.flink.util.Preconditions.checkState(boolean)> in 
(HybridSourceSplitEnumerator.java:110)
 Method <org.apache.flink.connector.base.source.hybrid.SwitchedSources.put(int, 
org.apache.flink.api.connector.source.Source)> calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in 
(SwitchedSources.java:48)
 Method 
<org.apache.flink.connector.base.source.hybrid.SwitchedSources.sourceOf(int)> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String, [Ljava.lang.Object;)> in (SwitchedSources.java:38)
 Method 
<org.apache.flink.connector.base.source.reader.SourceReaderBase.moveToNextSplit(org.apache.flink.connector.base.source.reader.RecordsWithSplitIds,
 org.apache.flink.api.connector.source.ReaderOutput)> calls method 
<org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in 
(SourceReaderBase.java:229)
 Method 
<org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.getNumAliveFetchers()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(SplitFetcherManager.java:0)
 Method 
<org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.take()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(FutureCompletingBlockingQueue.java:0)
-Method 
<org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> 
calls constructor 
<org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.<init>(org.apache.flink.streaming.api.functions.source.datagen.DataGenerator,
 long, java.lang.Long)> in (DataGenTableSource.java:71)
+Method 
<org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction.getSerializer()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(FromElementsGeneratorFunction.java:0)
+Method 
<org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction.serializeElements(java.lang.Iterable)>
 calls method <org.apache.flink.util.Preconditions.checkState(boolean, 
java.lang.Object)> in (FromElementsGeneratorFunction.java:97)
+Method 
<org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction.setOutputType(org.apache.flink.api.common.typeinfo.TypeInformation,
 org.apache.flink.api.common.ExecutionConfig)> calls method 
<org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in 
(FromElementsGeneratorFunction.java:151)
+Method 
<org.apache.flink.connector.datagen.source.DataGeneratorSource.getGeneratorFunction()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(DataGeneratorSource.java:0)
+Method 
<org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.createReader(org.apache.flink.api.connector.source.SourceReaderContext)>
 calls constructor 
<org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader.<init>(org.apache.flink.api.connector.source.SourceReader,
 org.apache.flink.api.connector.source.util.ratelimit.RateLimiter)> in 
(GeneratorSourceReaderFactory.java:63)
+Method 
<org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.createReader(org.apache.flink.api.connector.source.SourceReaderContext)>
 calls method 
<org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy.createRateLimiter(int)>
 in (GeneratorSourceReaderFactory.java:62)
+Method 
<org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> 
calls constructor 
<org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.<init>(org.apache.flink.streaming.api.functions.source.datagen.DataGenerator,
 long, java.lang.Long)> in (DataGenTableSource.java:67)
 Method 
<org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> 
has return type 
<org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource> 
in (DataGenTableSource.java:0)
 Method 
<org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> is 
annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(DataGenTableSource.java:0)
 Method 
<org.apache.flink.connector.datagen.table.DataGeneratorContainer.getGenerator()>
 has return type 
<org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in 
(DataGeneratorContainer.java:0)
@@ -748,8 +767,8 @@ Method 
<org.apache.flink.connector.file.table.stream.PartitionCommitter.processE
 Method 
<org.apache.flink.connector.file.table.stream.PartitionCommitter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has generic parameter type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.connector.file.table.stream.PartitionCommitInfo>>
 with type argument depending on 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(PartitionCommitter.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.PartitionCommitter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has parameter of type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(PartitionCommitter.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.PartitionTimeCommitPredicate.isPartitionCommittable(org.apache.flink.connector.file.table.stream.PartitionCommitPredicate$PredicateContext)>
 calls method 
<org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues(org.apache.flink.core.fs.Path)>
 in (PartitionTimeCommitPredicate.java:71)
-Method 
<org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger$1.currentProcTime()>
 calls method 
<org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()>
 in (ProcTimeCommitTrigger.java:110)
-Method 
<org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.addPartition(java.lang.String)>
 calls method 
<org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()>
 in (ProcTimeCommitTrigger.java:75)
+Method 
<org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger$1.currentProcTime()>
 calls method 
<org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()>
 in (ProcTimeCommitTrigger.java:111)
+Method 
<org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.addPartition(java.lang.String)>
 calls method 
<org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()>
 in (ProcTimeCommitTrigger.java:76)
 Method 
<org.apache.flink.connector.file.table.stream.StreamingFileWriter.closePartFileForPartitions()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.closePartFileForBucket(java.lang.Object)>
 in (StreamingFileWriter.java:130)
 Method 
<org.apache.flink.connector.file.table.stream.StreamingFileWriter.closePartFileForPartitions()>
 calls method 
<org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()>
 in (StreamingFileWriter.java:126)
 Method 
<org.apache.flink.connector.file.table.stream.StreamingFileWriter.commitUpToCheckpoint(long)>
 calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (StreamingFileWriter.java:154)
@@ -765,14 +784,14 @@ Method 
<org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.commit()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.closeForCommit()>
 in (CompactBucketWriter.java:49)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.factory(org.apache.flink.util.function.SupplierWithException)>
 has generic parameter type 
<org.apache.flink.util.function.SupplierWithException<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<T,
 java.lang.String>, java.io.IOException>> with type argument depending on 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter> in 
(CompactBucketWriter.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.write(java.lang.Object)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.write(java.lang.Object,
 long)> in (CompactBucketWriter.java:44)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long,
 java.util.Map)> calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactCoordinator.java:185)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long,
 java.util.Map)> calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactCoordinator.java:193)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)>
 in (CompactCoordinator.java:115)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer,
 org.apache.flink.api.common.typeutils.TypeSerializer)> in 
(CompactCoordinator.java:115)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class,
 org.apache.flink.api.common.ExecutionConfig)> in (CompactCoordinator.java:115)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 gets field 
<org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in 
(CompactCoordinator.java:106)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 gets field 
<org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in 
(CompactCoordinator.java:106)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in 
(CompactCoordinator.java:126)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long,
 java.util.Map)> calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactCoordinator.java:186)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long,
 java.util.Map)> calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactCoordinator.java:194)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)>
 in (CompactCoordinator.java:116)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer,
 org.apache.flink.api.common.typeutils.TypeSerializer)> in 
(CompactCoordinator.java:116)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class,
 org.apache.flink.api.common.ExecutionConfig)> in (CompactCoordinator.java:116)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 gets field 
<org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in 
(CompactCoordinator.java:107)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 gets field 
<org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in 
(CompactCoordinator.java:107)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in 
(CompactCoordinator.java:127)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has generic parameter type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.connector.file.table.stream.compact.CompactMessages$CoordinatorInput>>
 with type argument depending on 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(CompactCoordinator.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has parameter of type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(CompactCoordinator.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.commitUpToCheckpoint(long)>
 calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactFileWriter.java:63)
@@ -780,19 +799,19 @@ Method 
<org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.c
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.commitUpToCheckpoint(long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getNumberOfParallelSubtasks()>
 in (CompactFileWriter.java:63)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.onPartFileOpened(java.lang.String,
 org.apache.flink.core.fs.Path)> calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactFileWriter.java:52)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.convertFromUncompacted(org.apache.flink.core.fs.Path)>
 calls method <org.apache.flink.util.Preconditions.checkArgument(boolean, 
java.lang.Object)> in (CompactOperator.java:211)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)>
 calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactOperator.java:161)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getIndexOfThisSubtask()>
 in (CompactOperator.java:159)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getNumberOfParallelSubtasks()>
 in (CompactOperator.java:160)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)>
 in (CompactOperator.java:108)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer,
 org.apache.flink.api.common.typeutils.TypeSerializer)> in 
(CompactOperator.java:108)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class,
 org.apache.flink.api.common.ExecutionConfig)> in (CompactOperator.java:108)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 gets field 
<org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in 
(CompactOperator.java:102)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()> in 
(CompactOperator.java:138)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo.getConfiguration()>
 in (CompactOperator.java:139)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getIndexOfThisSubtask()>
 in (CompactOperator.java:125)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getNumberOfParallelSubtasks()>
 in (CompactOperator.java:124)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in 
(CompactOperator.java:120)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.tasks.StreamTask.getEnvironment()> in 
(CompactOperator.java:137)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)>
 calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactOperator.java:162)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getIndexOfThisSubtask()>
 in (CompactOperator.java:160)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getNumberOfParallelSubtasks()>
 in (CompactOperator.java:161)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)>
 in (CompactOperator.java:109)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer,
 org.apache.flink.api.common.typeutils.TypeSerializer)> in 
(CompactOperator.java:109)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class,
 org.apache.flink.api.common.ExecutionConfig)> in (CompactOperator.java:109)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 gets field 
<org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in 
(CompactOperator.java:103)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()> in 
(CompactOperator.java:139)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo.getConfiguration()>
 in (CompactOperator.java:140)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getIndexOfThisSubtask()>
 in (CompactOperator.java:126)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getNumberOfParallelSubtasks()>
 in (CompactOperator.java:125)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in 
(CompactOperator.java:121)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.tasks.StreamTask.getEnvironment()> in 
(CompactOperator.java:138)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has generic parameter type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.connector.file.table.stream.compact.CompactMessages$CoordinatorOutput>>
 with type argument depending on 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(CompactOperator.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has parameter of type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(CompactOperator.java:0)
 Method 
<org.apache.flink.connector.file.table.utils.CompactFileUtils.doSingleFileMove(org.apache.flink.core.fs.FileSystem,
 org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path)> calls method 
<org.apache.flink.util.IOUtils.copyBytes(java.io.InputStream, 
java.io.OutputStream, boolean)> in (CompactFileUtils.java:117)
@@ -807,10 +826,10 @@ Static Initializer 
<org.apache.flink.connector.file.src.compression.StandardDeCo
 Static Initializer 
<org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()>
 calls method 
<org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory.getInstance()>
 in (StandardDeCompressors.java:43)
 Static Initializer 
<org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()>
 calls method 
<org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.getInstance()>
 in (StandardDeCompressors.java:44)
 Static Initializer 
<org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()>
 calls method 
<org.apache.flink.api.common.io.compression.XZInputStreamFactory.getInstance()> 
in (StandardDeCompressors.java:46)
-Static Initializer 
<org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)>
 in (PartitionTimeCommitTrigger.java:51)
-Static Initializer 
<org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer,
 org.apache.flink.api.common.typeutils.TypeSerializer)> in 
(PartitionTimeCommitTrigger.java:55)
-Static Initializer 
<org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()>
 gets field 
<org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in 
(PartitionTimeCommitTrigger.java:55)
-Static Initializer 
<org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()>
 gets field 
<org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in 
(PartitionTimeCommitTrigger.java:51)
-Static Initializer 
<org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> 
calls constructor 
<org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer,
 org.apache.flink.api.common.typeutils.TypeSerializer)> in 
(ProcTimeCommitTrigger.java:46)
-Static Initializer 
<org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> 
gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> 
in (ProcTimeCommitTrigger.java:46)
-Static Initializer 
<org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> 
gets field 
<org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in 
(ProcTimeCommitTrigger.java:46)
+Static Initializer 
<org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)>
 in (PartitionTimeCommitTrigger.java:52)
+Static Initializer 
<org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer,
 org.apache.flink.api.common.typeutils.TypeSerializer)> in 
(PartitionTimeCommitTrigger.java:56)
+Static Initializer 
<org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()>
 gets field 
<org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in 
(PartitionTimeCommitTrigger.java:56)
+Static Initializer 
<org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()>
 gets field 
<org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in 
(PartitionTimeCommitTrigger.java:52)
+Static Initializer 
<org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> 
calls constructor 
<org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer,
 org.apache.flink.api.common.typeutils.TypeSerializer)> in 
(ProcTimeCommitTrigger.java:47)
+Static Initializer 
<org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> 
gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> 
in (ProcTimeCommitTrigger.java:47)
+Static Initializer 
<org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> 
gets field 
<org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in 
(ProcTimeCommitTrigger.java:47)
\ No newline at end of file
diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/pom.xml 
b/flink-architecture-tests/flink-architecture-tests-production/pom.xml
index fc0bd3c1046..4fe46212390 100644
--- a/flink-architecture-tests/flink-architecture-tests-production/pom.xml
+++ b/flink-architecture-tests/flink-architecture-tests-production/pom.xml
@@ -138,6 +138,11 @@ under the License.
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-datagen</artifactId>
+               </dependency>
        </dependencies>
 
        <build>
diff --git a/flink-architecture-tests/pom.xml b/flink-architecture-tests/pom.xml
index 34ffe550245..5b72c68b25e 100644
--- a/flink-architecture-tests/pom.xml
+++ b/flink-architecture-tests/pom.xml
@@ -153,6 +153,13 @@ under the License.
                                <scope>test</scope>
                        </dependency>
 
+                       <dependency>
+                               <groupId>org.apache.flink</groupId>
+                               <artifactId>flink-connector-datagen</artifactId>
+                               <version>${project.version}</version>
+                               <scope>test</scope>
+                       </dependency>
+
                        <!-- Test Utils -->
 
                        <dependency>
diff --git 
a/flink-connectors/flink-connector-datagen-test/archunit-violations/6dae736c-6957-4d04-93bf-d7ebc5ca97ab
 
b/flink-connectors/flink-connector-datagen-test/archunit-violations/6dae736c-6957-4d04-93bf-d7ebc5ca97ab
new file mode 100644
index 00000000000..e69de29bb2d
diff --git 
a/flink-connectors/flink-connector-datagen-test/archunit-violations/f2b2daa5-8ca8-4a74-bb17-cb9f49e3e0e2
 
b/flink-connectors/flink-connector-datagen-test/archunit-violations/f2b2daa5-8ca8-4a74-bb17-cb9f49e3e0e2
new file mode 100644
index 00000000000..e69de29bb2d
diff --git 
a/flink-connectors/flink-connector-datagen-test/archunit-violations/stored.rules
 
b/flink-connectors/flink-connector-datagen-test/archunit-violations/stored.rules
new file mode 100644
index 00000000000..551a540797d
--- /dev/null
+++ 
b/flink-connectors/flink-connector-datagen-test/archunit-violations/stored.rules
@@ -0,0 +1,4 @@
+#
+#Sat Oct 14 00:17:16 CEST 2023
+ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ 
extension=6dae736c-6957-4d04-93bf-d7ebc5ca97ab
+Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ 
ITCase=f2b2daa5-8ca8-4a74-bb17-cb9f49e3e0e2
diff --git a/flink-connectors/flink-connector-datagen-test/pom.xml 
b/flink-connectors/flink-connector-datagen-test/pom.xml
index 5b89683ba20..20bdb4c6d16 100644
--- a/flink-connectors/flink-connector-datagen-test/pom.xml
+++ b/flink-connectors/flink-connector-datagen-test/pom.xml
@@ -29,7 +29,7 @@
        </parent>
 
        <artifactId>flink-connector-datagen-tests</artifactId>
-       <name>Flink : Connectors : Datagen</name>
+       <name>Flink : Connectors : Datagen Tests</name>
 
        <packaging>jar</packaging>
 
@@ -71,6 +71,13 @@
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
                <!-- ArchUit test dependencies -->
 
                <dependency>
@@ -91,4 +98,44 @@
                </dependency>
        </dependencies>
 
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.avro</groupId>
+                               <artifactId>avro-maven-plugin</artifactId>
+                               <version>${avro.version}</version>
+                               <executions>
+                                       <execution>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>schema</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+                                                       
<testOutputDirectory>${project.basedir}/target/generated-test-sources/avro</testOutputDirectory>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>add-avro-test-source</id>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       
<goal>add-test-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>${project.build.directory}/generated-test-sources/avro</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
 </project>
diff --git 
a/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorSourceITCase.java
 
b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorSourceITCase.java
new file mode 100644
index 00000000000..4c0df21de4b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorSourceITCase.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.generated.User;
+import org.apache.flink.formats.avro.typeutils.AvroSerializer;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Integration tests for {@code FromElementsGeneratorSourceITCase}. */
+class FromElementsGeneratorSourceITCase extends TestLogger {
+
+    private static final int PARALLELISM = 1;
+
+    @RegisterExtension
+    private static final MiniClusterExtension miniClusterExtension =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    @DisplayName("Produces expected String output")
+    void testBasicType() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        String[] data = {"Foo", "bar", "baz"};
+        FromElementsGeneratorFunction<String> generatorFunction =
+                new FromElementsGeneratorFunction<>(Types.STRING, "Foo", 
"bar", "baz");
+        DataGeneratorSource<String> dataGeneratorSource =
+                new DataGeneratorSource<>(generatorFunction, data.length, 
Types.STRING);
+        DataStream<String> stream =
+                env.fromSource(
+                        dataGeneratorSource, WatermarkStrategy.noWatermarks(), 
"generator source");
+
+        List<String> result = stream.executeAndCollect(data.length + 1);
+        TypeSerializer<String> serializer = generatorFunction.getSerializer();
+
+        assertThat(serializer).isEqualTo(Types.STRING.createSerializer(new 
ExecutionConfig()));
+        assertThat(result).containsExactly(data);
+    }
+
+    @Test
+    @DisplayName("Handles Avro data correctly")
+    void testAvroType() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        User user1 = new User("Foo", 1);
+        User user2 = new User("Bar", 2);
+        User[] data = {user1, user2};
+        FromElementsGeneratorFunction<User> generatorFunction =
+                new 
FromElementsGeneratorFunction<>(TypeExtractor.createTypeInfo(User.class), data);
+        DataGeneratorSource<User> dataGeneratorSource =
+                new DataGeneratorSource<>(
+                        generatorFunction, data.length, new 
AvroTypeInfo<>(User.class));
+        DataStream<User> stream =
+                env.fromSource(
+                        dataGeneratorSource, WatermarkStrategy.noWatermarks(), 
"generator source");
+
+        List<User> result = stream.executeAndCollect(data.length + 1);
+        TypeSerializer<User> serializer = generatorFunction.getSerializer();
+
+        assertThat(serializer).isInstanceOf(AvroSerializer.class);
+        assertThat(result).containsExactly(data);
+    }
+
+    @Test
+    @DisplayName("Test exception when more elements are requested than 
available")
+    void testMoreElementsRequestedThanAvailable() {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        String[] data = {"foo", "bar"};
+        FromElementsGeneratorFunction<String> generatorFunction =
+                new FromElementsGeneratorFunction<>(Types.STRING, data);
+        DataGeneratorSource<String> dataGeneratorSource =
+                new DataGeneratorSource<>(generatorFunction, data.length + 1, 
Types.STRING);
+        DataStream<String> stream =
+                env.fromSource(
+                        dataGeneratorSource, WatermarkStrategy.noWatermarks(), 
"generator source");
+
+        assertThatThrownBy(() -> stream.executeAndCollect(data.length + 1))
+                .hasRootCauseInstanceOf(NoSuchElementException.class);
+    }
+}
diff --git 
a/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
 
b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
index 842982ac584..2aa1468caea 100644
--- 
a/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
+++ 
b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.connector.datagen.source;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
@@ -30,6 +33,8 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -200,6 +205,55 @@ class DataGeneratorSourceITCase extends TestLogger {
         assertThat(results).hasSize(capacityPerCheckpoint);
     }
 
+    @Test
+    @DisplayName("Stream returns() call is propagated correctly.")
+    @SuppressWarnings("unchecked")
+    void testReturnsIsCorrectlyPropagated() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        OutputTypeConfigurableGeneratorFunction generatorFunction =
+                new 
OutputTypeConfigurableGeneratorFunction(BasicTypeInfo.STRING_TYPE_INFO);
+
+        final DataStreamSource<Long> streamSource =
+                getGeneratorSourceStream(generatorFunction, env, 1);
+        
streamSource.returns(BasicTypeInfo.LONG_TYPE_INFO).executeAndCollect(1);
+
+        DataGeneratorSource<Long> dataGeneratorSource =
+                (DataGeneratorSource<Long>)
+                        ((SourceTransformation<Long, ?, ?>) 
streamSource.getTransformation())
+                                .getSource();
+
+        
assertThat(dataGeneratorSource.getProducedType()).isEqualTo(BasicTypeInfo.LONG_TYPE_INFO);
+        
assertThat(generatorFunction.getTypeInformation()).isEqualTo(BasicTypeInfo.LONG_TYPE_INFO);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static class OutputTypeConfigurableGeneratorFunction
+            implements GeneratorFunction<Long, Long>, 
OutputTypeConfigurable<Long> {
+
+        public OutputTypeConfigurableGeneratorFunction(TypeInformation 
typeInformation) {
+            this.typeInformation = typeInformation;
+        }
+
+        private TypeInformation typeInformation;
+
+        @Override
+        public Long map(Long value) throws Exception {
+            return 0L;
+        }
+
+        @Override
+        public void setOutputType(
+                TypeInformation<Long> outTypeInfo, ExecutionConfig 
executionConfig) {
+            typeInformation = outTypeInfo;
+        }
+
+        public TypeInformation getTypeInformation() {
+            return typeInformation;
+        }
+    }
+
     private static class FirstCheckpointFilter
             implements FlatMapFunction<Long, Long>, CheckpointedFunction {
 
@@ -221,7 +275,7 @@ class DataGeneratorSourceITCase extends TestLogger {
         public void initializeState(FunctionInitializationContext context) 
throws Exception {}
     }
 
-    private DataStream<Long> getGeneratorSourceStream(
+    private DataStreamSource<Long> getGeneratorSourceStream(
             GeneratorFunction<Long, Long> generatorFunction,
             StreamExecutionEnvironment env,
             long count) {
diff --git 
a/flink-connectors/flink-connector-datagen-test/src/test/resources/avro/user.avsc
 
b/flink-connectors/flink-connector-datagen-test/src/test/resources/avro/user.avsc
new file mode 100644
index 00000000000..504a06b9340
--- /dev/null
+++ 
b/flink-connectors/flink-connector-datagen-test/src/test/resources/avro/user.avsc
@@ -0,0 +1,9 @@
+{
+  "namespace": "org.apache.flink.connector.datagen.source.generated",
+  "type": "record",
+  "name": "User",
+  "fields": [
+    {"name": "name", "type": "string"},
+    {"name": "age", "type": "int"}
+  ]
+}
diff --git 
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java
 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java
new file mode 100644
index 00000000000..1acd4f8de0c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * <p>This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * @param <OUT> The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction<OUT>
+        implements GeneratorFunction<Long, OUT>, OutputTypeConfigurable<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+    /** The (de)serializer to be used for the data elements. */
+    private @Nullable TypeSerializer<OUT> serializer;
+
+    /** The actual data elements, in serialized form. */
+    private byte[] elementsSerialized;
+
+    /** The number of elements emitted already. */
+    private int numElementsEmitted;
+
+    private transient Iterable<OUT> elements;
+    private transient DataInputView input;
+
+    @SafeVarargs
+    public FromElementsGeneratorFunction(TypeInformation<OUT> typeInfo, OUT... 
elements) {
+        this(typeInfo, new ExecutionConfig(), Arrays.asList(elements));
+    }
+
+    public FromElementsGeneratorFunction(
+            TypeInformation<OUT> typeInfo, ExecutionConfig config, 
Iterable<OUT> elements) {
+        // must not have null elements and mixed elements
+        checkIterable(elements, typeInfo.getTypeClass());
+        this.serializer = typeInfo.createSerializer(config);
+        this.elements = elements;
+        trySerialize(elements);
+    }
+
+    @VisibleForTesting
+    @Nullable
+    public TypeSerializer<OUT> getSerializer() {
+        return serializer;
+    }
+
+    private void serializeElements(Iterable<OUT> elements) throws IOException {
+        Preconditions.checkState(serializer != null, "serializer not set");
+        LOG.info("Serializing elements using  " + serializer);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+        try {
+            for (OUT element : elements) {
+                serializer.serialize(element, wrapper);
+            }
+        } catch (Exception e) {
+            throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+        }
+        this.elementsSerialized = baos.toByteArray();
+    }
+
+    @Override
+    public void open(SourceReaderContext readerContext) throws Exception {
+        ByteArrayInputStream bais = new 
ByteArrayInputStream(elementsSerialized);
+        this.input = new DataInputViewStreamWrapper(bais);
+    }
+
+    @Override
+    public OUT map(Long nextIndex) throws Exception {
+        // Move iterator to the required position in case of failure recovery
+        while (numElementsEmitted < nextIndex) {
+            numElementsEmitted++;
+            tryDeserialize(serializer, input);
+        }
+        numElementsEmitted++;
+        return tryDeserialize(serializer, input);
+    }
+
+    private OUT tryDeserialize(TypeSerializer<OUT> serializer, DataInputView 
input)
+            throws IOException {
+        try {
+            return serializer.deserialize(input);
+        } catch (EOFException eof) {
+            throw new NoSuchElementException(
+                    "Reached the end of the collection. This could be caused 
by issues with the serializer or by calling the map() function more times than 
there are elements in the collection. Make sure that you set the number of 
records to be produced by the DataGeneratorSource equal to the number of 
elements in the collection.");
+        } catch (Exception e) {
+            throw new IOException(
+                    "Failed to deserialize an element from the source. "
+                            + "If you are using user-defined serialization 
(Value and Writable types), check the "
+                            + "serialization functions.\nSerializer is "
+                            + serializer,
+                    e);
+        }
+    }
+
+    // For backward compatibility: Supports legacy usage of
+    // StreamExecutionEnvironment#fromElements() which lacked type information 
and relied on the
+    // returns() method. See FLINK-21386 for details.
+    @Override
+    public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
+        Preconditions.checkState(
+                elements != null,
+                "The output type should've been specified before shipping the 
graph to the cluster");
+        checkIterable(elements, outTypeInfo.getTypeClass());
+        TypeSerializer<OUT> newSerializer = 
outTypeInfo.createSerializer(executionConfig);
+        if (Objects.equals(serializer, newSerializer)) {
+            return;
+        }
+        serializer = newSerializer;
+        try {
+            serializeElements(elements);
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    private void trySerialize(Iterable<OUT> elements) {
+        try {
+            serializeElements(elements);
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utilities
+    // ------------------------------------------------------------------------
+
+    /**
+     * Verifies that all elements in the iterable are non-null, and are of the 
given class, or a
+     * subclass thereof.
+     *
+     * @param elements The iterable to check.
+     * @param viewedAs The class to which the elements must be assignable to.
+     * @param <OUT> The generic type of the iterable to be checked.
+     */
+    public static <OUT> void checkIterable(Iterable<OUT> elements, Class<?> 
viewedAs) {
+        for (OUT elem : elements) {
+            if (elem == null) {
+                throw new IllegalArgumentException("The collection contains a 
null element");
+            }
+
+            if (!viewedAs.isAssignableFrom(elem.getClass())) {
+                throw new IllegalArgumentException(
+                        "The elements in the collection are not all subclasses 
of "
+                                + viewedAs.getCanonicalName());
+            }
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
index 8f128414efb..a344eb635ad 100644
--- 
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
+++ 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.datagen.source;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
@@ -34,6 +35,7 @@ import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 
 import java.util.Collection;
 
@@ -92,15 +94,18 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Experimental
 public class DataGeneratorSource<OUT>
         implements Source<OUT, NumberSequenceSplit, 
Collection<NumberSequenceSplit>>,
-                ResultTypeQueryable<OUT> {
+                ResultTypeQueryable<OUT>,
+                OutputTypeConfigurable<OUT> {
 
     private static final long serialVersionUID = 1L;
 
     private final SourceReaderFactory<OUT, NumberSequenceSplit> 
sourceReaderFactory;
-    private final TypeInformation<OUT> typeInfo;
+    private TypeInformation<OUT> typeInfo;
 
     private final NumberSequenceSource numberSource;
 
+    private final GeneratorFunction<Long, OUT> generatorFunction;
+
     /**
      * Instantiates a new {@code DataGeneratorSource}.
      *
@@ -130,23 +135,41 @@ public class DataGeneratorSource<OUT>
             TypeInformation<OUT> typeInfo) {
         this(
                 new GeneratorSourceReaderFactory<>(generatorFunction, 
rateLimiterStrategy),
+                generatorFunction,
                 count,
                 typeInfo);
-        ClosureCleaner.clean(
-                generatorFunction, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
         ClosureCleaner.clean(
                 rateLimiterStrategy, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
     }
 
     private DataGeneratorSource(
             SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,
+            GeneratorFunction<Long, OUT> generatorFunction,
             long count,
             TypeInformation<OUT> typeInfo) {
         this.sourceReaderFactory = checkNotNull(sourceReaderFactory);
-        ClosureCleaner.clean(
-                sourceReaderFactory, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+        this.generatorFunction = checkNotNull(generatorFunction);
         this.typeInfo = checkNotNull(typeInfo);
         this.numberSource = new NumberSequenceSource(0, count - 1);
+        ClosureCleaner.clean(
+                generatorFunction, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+        ClosureCleaner.clean(
+                sourceReaderFactory, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
+        this.typeInfo = outTypeInfo;
+        if (generatorFunction instanceof OutputTypeConfigurable) {
+            ((OutputTypeConfigurable<OUT>) generatorFunction)
+                    .setOutputType(outTypeInfo, executionConfig);
+        }
+    }
+
+    @VisibleForTesting
+    public GeneratorFunction<Long, OUT> getGeneratorFunction() {
+        return generatorFunction;
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/OutputTypeConfigurable.java
 
b/flink-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
similarity index 82%
rename from 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/OutputTypeConfigurable.java
rename to 
flink-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
index 2ff3ceabcd6..d2850706397 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/OutputTypeConfigurable.java
+++ 
b/flink-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.api.java.typeutils;
+package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -27,6 +27,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
  * at {@code org.apache.flink.streaming.api.graph.StreamGraph} generation. 
This can be useful for
  * cases where the output type is specified by the returns method and, thus, 
after the stream
  * operator has been created.
+ *
+ * <p>NOTE: this class was moved from flink-streaming-java to the same package 
in flink-core. This
+ * was required in order to avoid cyclic dependencies for internal connectors, 
such as
+ * flink-connector-datagen that is used in flink-streaming-java but also 
relies on {@code
+ * OutputTypeConfigurable}. Since flink-core is the dependency of 
flink-streaming-java, this does
+ * not introduce breaking changes.
  */
 @PublicEvolving
 public interface OutputTypeConfigurable<OUT> {
diff --git 
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
 
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index df3453dd730..eb1e38651d6 100644
--- 
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ 
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -41,7 +41,7 @@ class 
StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
         # is deprecated, exclude them.
         return {'getLastJobExecutionResult', 'getId', 'getIdString',
                 'createCollectionsEnvironment', 'createLocalEnvironment',
-                'createRemoteEnvironment', 'addOperator', 'fromElements',
+                'createRemoteEnvironment', 'addOperator', 'fromElements', 
'fromData',
                 'resetContextEnvironment', 'getCachedFiles', 
'generateSequence',
                 'getNumberOfExecutionRetries', 'getStreamGraph', 
'fromParallelCollection',
                 'readFileStream', 'isForceCheckpointing', 'readFile', 'clean',
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index 4a7644dfb45..542e05141fb 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -88,6 +88,12 @@ under the License.
                        <version>3.6.1</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-datagen</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
                <!-- test dependencies -->
 
                <dependency>
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index e297f86f6c0..5e1a9ad59a1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -37,7 +37,7 @@ import 
org.apache.flink.streaming.api.transformations.SourceTransformation;
 @Public
 public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
 
-    private final boolean isParallel;
+    private boolean isParallel;
 
     public DataStreamSource(
             StreamExecutionEnvironment environment,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 343308edcf2..7ce26a89d93 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -62,6 +62,8 @@ import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.StateChangelogOptions;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
 import org.apache.flink.core.execution.CacheSupportedPipelineExecutor;
 import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
@@ -1120,6 +1122,114 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
     // Data stream creations
     // 
--------------------------------------------------------------------------------------------
 
+    /**
+     * Creates a new data stream that contains the given elements. The 
elements must all be of the
+     * same type, for example, all of the {@link String} or {@link Integer}.
+     *
+     * <p>The framework will try and determine the exact type from the 
elements. In case of generic
+     * elements, it may be necessary to manually supply the type information 
via {@link
+     * #fromData(org.apache.flink.api.common.typeinfo.TypeInformation, 
OUT...)}.
+     *
+     * <p>NOTE: This creates a non-parallel data stream source by default 
(parallelism of one).
+     * Adjustment of parallelism is supported via {@code setParallelism()} on 
the result.
+     *
+     * @param data The array of elements to create the data stream from.
+     * @param <OUT> The type of the returned data stream
+     * @return The data stream representing the given array of elements
+     */
+    @SafeVarargs
+    public final <OUT> DataStreamSource<OUT> fromData(OUT... data) {
+        if (data.length == 0) {
+            throw new IllegalArgumentException(
+                    "fromElements needs at least one element as argument");
+        }
+
+        TypeInformation<OUT> typeInfo;
+        try {
+            typeInfo = TypeExtractor.getForObject(data[0]);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Could not create TypeInformation for type "
+                            + data[0].getClass().getName()
+                            + "; please specify the TypeInformation manually 
via "
+                            + "StreamExecutionEnvironment#fromData(Collection, 
TypeInformation)",
+                    e);
+        }
+        return fromData(Arrays.asList(data), typeInfo);
+    }
+
+    /**
+     * Creates a new data stream that contains the given elements. The 
elements should be the same
+     * or be the subclass to the {@code typeInfo} type. The sequence of 
elements must not be empty.
+     *
+     * <p>NOTE: This creates a non-parallel data stream source by default 
(parallelism of one).
+     * Adjustment of parallelism is supported via {@code setParallelism()} on 
the result.
+     *
+     * @param typeInfo The type information of the elements.
+     * @param data The array of elements to create the data stream from.
+     * @param <OUT> The type of the returned data stream
+     * @return The data stream representing the given array of elements
+     */
+    @SafeVarargs
+    public final <OUT> DataStreamSource<OUT> fromData(TypeInformation<OUT> 
typeInfo, OUT... data) {
+        if (data.length == 0) {
+            throw new IllegalArgumentException(
+                    "fromElements needs at least one element as argument");
+        }
+        return fromData(Arrays.asList(data), typeInfo);
+    }
+
+    private <OUT> DataStreamSource<OUT> fromData(
+            Collection<OUT> data, TypeInformation<OUT> typeInfo) {
+        Preconditions.checkNotNull(data, "Collection must not be null");
+
+        FromElementsGeneratorFunction<OUT> generatorFunction =
+                new FromElementsGeneratorFunction<>(typeInfo, getConfig(), 
data);
+
+        DataGeneratorSource<OUT> generatorSource =
+                new DataGeneratorSource<>(generatorFunction, data.size(), 
typeInfo);
+
+        return fromSource(
+                        generatorSource,
+                        WatermarkStrategy.forMonotonousTimestamps(),
+                        "Collection Source")
+                .setParallelism(1);
+    }
+
+    /**
+     * Creates a new data stream that contains the given elements. The 
framework will determine the
+     * type according to the based type user supplied. The elements should be 
the same or be the
+     * subclass to the based type. The sequence of elements must not be empty.
+     *
+     * <p>NOTE: This creates a non-parallel data stream source by default 
(parallelism of one).
+     * Adjustment of parallelism is supported via {@code setParallelism()} on 
the result.
+     *
+     * @param type The based class type in the collection.
+     * @param data The array of elements to create the data stream from.
+     * @param <OUT> The type of the returned data stream
+     * @return The data stream representing the given array of elements
+     */
+    @SafeVarargs
+    public final <OUT> DataStreamSource<OUT> fromData(Class<OUT> type, OUT... 
data) {
+        if (data.length == 0) {
+            throw new IllegalArgumentException(
+                    "fromElements needs at least one element as argument");
+        }
+
+        TypeInformation<OUT> typeInfo;
+        try {
+            typeInfo = TypeExtractor.getForClass(type);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Could not create TypeInformation for type "
+                            + type.getName()
+                            + "; please specify the TypeInformation manually 
via "
+                            + "StreamExecutionEnvironment#fromData(Collection, 
TypeInformation)",
+                    e);
+        }
+        return fromData(Arrays.asList(data), typeInfo);
+    }
+
     /**
      * Creates a new data stream that contains a sequence of numbers. This is 
a parallel source, if
      * you manually set the parallelism to {@code 1} (using {@link
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
deleted file mode 100644
index f358aea0c92..00000000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-/**
- * Stream operators can implement this interface if they need access to the 
output type information
- * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. 
This can be useful for
- * cases where the output type is specified by the returns method and, thus, 
after the stream
- * operator has been created.
- *
- * @deprecated Use {@link 
org.apache.flink.api.java.typeutils.OutputTypeConfigurable} instead
- */
-@Deprecated
-@PublicEvolving
-public interface OutputTypeConfigurable<OUT> {
-
-    /**
-     * Is called by the {@link 
org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer,
-     * String, StreamOperator, TypeInformation, TypeInformation, String)} 
method when the {@link
-     * org.apache.flink.streaming.api.graph.StreamGraph} is generated. The 
method is called with the
-     * output {@link TypeInformation} which is also used for the {@link
-     * org.apache.flink.streaming.runtime.tasks.StreamTask} output serializer.
-     *
-     * @param outTypeInfo Output type information of the {@link
-     *     org.apache.flink.streaming.runtime.tasks.StreamTask}
-     * @param executionConfig Execution configuration
-     */
-    void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig 
executionConfig);
-}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
index fbb734eb07e..95329baaef4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 
@@ -110,21 +109,13 @@ public class SimpleOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
 
     @Override
     public boolean isOutputTypeConfigurable() {
-        return operator instanceof OutputTypeConfigurable
-                || operator // legacy kept for compatibility purposes
-                        instanceof 
org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+        return operator instanceof OutputTypeConfigurable;
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public void setOutputType(TypeInformation<OUT> type, ExecutionConfig 
executionConfig) {
-        if (operator instanceof OutputTypeConfigurable) {
-            ((OutputTypeConfigurable<OUT>) operator).setOutputType(type, 
executionConfig);
-        } else if (operator // legacy kept for compatibility purposes
-                instanceof 
org.apache.flink.streaming.api.operators.OutputTypeConfigurable) {
-            
((org.apache.flink.streaming.api.operators.OutputTypeConfigurable<OUT>) 
operator)
-                    .setOutputType(type, executionConfig);
-        }
+        ((OutputTypeConfigurable<OUT>) operator).setOutputType(type, 
executionConfig);
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 247cb456d88..8ec3f3c9a9d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -158,9 +157,7 @@ public class SourceOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
 
     @Override
     public boolean isOutputTypeConfigurable() {
-        return source instanceof OutputTypeConfigurable
-                || source // legacy kept for compatibility purposes
-                        instanceof 
org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+        return source instanceof OutputTypeConfigurable;
     }
 
     @Override
@@ -168,10 +165,6 @@ public class SourceOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OU
     public void setOutputType(TypeInformation<OUT> type, ExecutionConfig 
executionConfig) {
         if (source instanceof OutputTypeConfigurable) {
             ((OutputTypeConfigurable<OUT>) source).setOutputType(type, 
executionConfig);
-        } else if (source // legacy kept for compatibility purposes
-                instanceof 
org.apache.flink.streaming.api.operators.OutputTypeConfigurable) {
-            
((org.apache.flink.streaming.api.operators.OutputTypeConfigurable<OUT>) source)
-                    .setOutputType(type, executionConfig);
         }
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 0061ead05be..721f01b1fd7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -657,13 +657,6 @@ public class DataStreamTest extends TestLogger {
                         .getStreamNode(sink.getTransformation().getId())
                         .getParallelism());
 
-        try {
-            src.setParallelism(3);
-            fail();
-        } catch (IllegalArgumentException success) {
-            // do nothing
-        }
-
         DataStreamSource<Long> parallelSource = env.generateSequence(0, 0);
         parallelSource.sinkTo(new DiscardingSink<Long>());
         assertEquals(7, 
getStreamGraph(env).getStreamNode(parallelSource.getId()).getParallelism());
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
index f04ce7a7883..d6cf08c7cc0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -23,12 +23,14 @@ import 
org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -46,6 +48,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.SplittableIterator;
@@ -89,10 +92,9 @@ class StreamExecutionEnvironmentTest {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         DataStreamSource<String> source = env.fromElements("a", "b");
 
-        FromElementsFunction<String> elementsFunction =
-                (FromElementsFunction<String>) 
getFunctionFromDataSource(source);
-        assertThat(elementsFunction.getSerializer())
-                
.isEqualTo(BasicTypeInfo.STRING_TYPE_INFO.createSerializer(env.getConfig()));
+        DataGeneratorSource<String> generatorSource = 
getSourceFromStream(source);
+
+        
assertThat(generatorSource.getProducedType()).isEqualTo(BasicTypeInfo.STRING_TYPE_INFO);
     }
 
     @Test
@@ -103,12 +105,33 @@ class StreamExecutionEnvironmentTest {
 
         source.returns(customType);
 
-        FromElementsFunction<String> elementsFunction =
-                (FromElementsFunction<String>) 
getFunctionFromDataSource(source);
-        assertThat(elementsFunction.getSerializer())
-                
.isNotEqualTo(BasicTypeInfo.STRING_TYPE_INFO.createSerializer(env.getConfig()));
-        assertThat(elementsFunction.getSerializer())
-                .isEqualTo(customType.createSerializer(env.getConfig()));
+        DataGeneratorSource<String> generatorSource = 
getSourceFromStream(source);
+        source.sinkTo(new DiscardingSink<>());
+        env.getStreamGraph();
+
+        
assertThat(generatorSource.getProducedType()).isNotEqualTo(BasicTypeInfo.STRING_TYPE_INFO);
+        assertThat(generatorSource.getProducedType()).isEqualTo(customType);
+    }
+
+    @Test
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    void testFromElementsPostConstructionTypeIncompatible() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<String> source = env.fromElements("a", "b");
+        source.returns((TypeInformation) BasicTypeInfo.INT_TYPE_INFO);
+        source.sinkTo(new DiscardingSink<>());
+
+        assertThatThrownBy(env::getStreamGraph)
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("not all subclasses of 
java.lang.Integer");
+    }
+
+    @Test
+    void testFromElementsNullElement() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        assertThatThrownBy(() -> env.fromElements("a", null, "c"))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("contains a null element");
     }
 
     @Test
@@ -171,7 +194,7 @@ class StreamExecutionEnvironmentTest {
         
assertThat(getFunctionFromDataSource(src2)).isInstanceOf(StatefulSequenceSource.class);
 
         DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
-        
assertThat(getFunctionFromDataSource(src3)).isInstanceOf(FromElementsFunction.class);
+        
assertThat(getSourceFromDataSourceTyped(src3)).isInstanceOf(DataGeneratorSource.class);
 
         DataStreamSource<Long> src4 = env.fromCollection(list);
         
assertThat(getFunctionFromDataSource(src4)).isInstanceOf(FromElementsFunction.class);
@@ -511,6 +534,18 @@ class StreamExecutionEnvironmentTest {
         return (SourceFunction<T>) operator.getUserFunction();
     }
 
+    @SuppressWarnings("unchecked")
+    private static <T, S extends Source<T, ?, ?>> S 
getSourceFromStream(DataStream<T> stream) {
+        return (S) ((SourceTransformation<T, ?, ?>) 
stream.getTransformation()).getSource();
+    }
+
+    private static <T> Source<T, ?, ?> getSourceFromDataSourceTyped(
+            DataStreamSource<T> dataStreamSource) {
+        dataStreamSource.sinkTo(new DiscardingSink<>());
+        dataStreamSource.getExecutionEnvironment().getStreamGraph();
+        return ((SourceTransformation<T, ?, ?>) 
dataStreamSource.getTransformation()).getSource();
+    }
+
     private static class DummySplittableIterator<T> extends 
SplittableIterator<T> {
         private static final long serialVersionUID = 1312752876092210499L;
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index b4b51053459..c97d1efa5a0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -52,10 +52,10 @@ import 
org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
-import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.CacheTransformation;
 import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
@@ -158,7 +158,8 @@ public class StreamGraphGeneratorTest extends TestLogger {
                     
Assertions.assertThat(node.getBufferTimeout()).isEqualTo(77L);
                     break;
                 default:
-                    
Assertions.assertThat(node.getOperator()).isInstanceOf(StreamSource.class);
+                    Assertions.assertThat(node.getOperatorFactory())
+                            .isInstanceOf(SourceOperatorFactory.class);
             }
         }
     }
@@ -566,15 +567,15 @@ public class StreamGraphGeneratorTest extends TestLogger {
         int maxParallelism = 42;
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        DataStream<Integer> input1 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(128);
-        DataStream<Integer> input2 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(129);
+        DataStream<Long> input1 = env.fromSequence(1, 
4).setMaxParallelism(128);
+        DataStream<Long> input2 = env.fromSequence(1, 
4).setMaxParallelism(129);
 
         env.getConfig().setMaxParallelism(maxParallelism);
 
-        DataStream<Integer> keyedResult =
+        DataStream<Long> keyedResult =
                 input1.connect(input2)
                         .keyBy(value -> value, value -> value)
-                        .map(new NoOpIntCoMap());
+                        .map(new NoOpLongCoMap());
 
         keyedResult.sinkTo(new DiscardingSink<>());
 
@@ -1151,14 +1152,14 @@ public class StreamGraphGeneratorTest extends 
TestLogger {
         }
     }
 
-    static class NoOpIntCoMap implements CoMapFunction<Integer, Integer, 
Integer> {
+    static class NoOpLongCoMap implements CoMapFunction<Long, Long, Long> {
         private static final long serialVersionUID = 1886595528149124270L;
 
-        public Integer map1(Integer value) throws Exception {
+        public Long map1(Long value) throws Exception {
             return value;
         }
 
-        public Integer map2(Integer value) throws Exception {
+        public Long map2(Long value) throws Exception {
             return value;
         }
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index e4c3bdf133e..22b14f63afb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -85,6 +85,7 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
 import 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -1113,9 +1114,10 @@ class StreamingJobGraphGeneratorTest {
 
     @Test
     void testYieldingOperatorNotChainableToTaskChainedToLegacySource() {
+        // TODO: this test can be removed when the legacy SourceFunction API 
gets removed
         StreamExecutionEnvironment chainEnv = 
StreamExecutionEnvironment.createLocalEnvironment(1);
 
-        chainEnv.fromElements(1)
+        chainEnv.addSource(new LegacySource())
                 .map((x) -> x)
                 // not chainable because of YieldingOperatorFactory and legacy 
source
                 .transform(
@@ -1161,9 +1163,10 @@ class StreamingJobGraphGeneratorTest {
      */
     @Test
     void testYieldingOperatorProperlyChainedOnLegacySources() {
+        // TODO: this test can be removed when the legacy SourceFunction API 
gets removed
         StreamExecutionEnvironment chainEnv = 
StreamExecutionEnvironment.createLocalEnvironment(1);
 
-        chainEnv.fromElements(1)
+        chainEnv.addSource(new LegacySource())
                 .map((x) -> x)
                 // should automatically break chain here
                 .transform("test", BasicTypeInfo.INT_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
@@ -2652,4 +2655,13 @@ class StreamingJobGraphGeneratorTest {
             return new HashSet<>(completedClusterDatasetIds);
         }
     }
+
+    private static class LegacySource implements SourceFunction<Integer> {
+        public void run(SourceContext<Integer> sourceContext) {
+            sourceContext.collect(1);
+        }
+
+        @Override
+        public void cancel() {}
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
index 31b7ebf7273..6caf9a5a3aa 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -124,7 +125,8 @@ class MultipleInputNodeCreationProcessorTest extends 
TableTestBase {
     }
 
     private void createNonChainableStream(TableTestUtil util) {
-        DataStreamSource<Integer> dataStream = 
util.getStreamEnv().fromElements(1, 2, 3);
+        DataStreamSource<Integer> dataStream =
+                util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3));
         TableTestUtil.createTemporaryView(
                 util.tableEnv(),
                 "nonChainableStream",
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 89114013722..d1c26ecfe68 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -284,6 +284,13 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
                <!-- For UpsertTestDynamicTableSinkITCase -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
@@ -305,7 +312,7 @@ under the License.
                        <artifactId>reflections</artifactId>
                </dependency>
 
-    </dependencies>
+       </dependencies>
 
        <build>
                <plugins>
@@ -710,6 +717,42 @@ under the License.
                                        </execution>
                                </executions>
                        </plugin>
+                       <plugin>
+                               <groupId>org.apache.avro</groupId>
+                               <artifactId>avro-maven-plugin</artifactId>
+                               <version>${avro.version}</version>
+                               <executions>
+                                       <execution>
+                                               
<id>generate-avro-test-sources</id>
+                                               
<phase>generate-test-sources</phase>
+                                               <goals>
+                                                       <goal>schema</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+                                                       
<testOutputDirectory>${project.basedir}/target/generated-test-sources/avro</testOutputDirectory>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>add-avro-test-source</id>
+                                               
<phase>generate-test-sources</phase>
+                                               <goals>
+                                                       
<goal>add-test-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>${project.build.directory}/generated-test-sources/avro</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
                </plugins>
        </build>
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java
index 4d9c3dabfd3..db768bcc18f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.test.completeness;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.table.dataview.ListViewTypeInfo;
 import org.apache.flink.table.dataview.MapViewTypeInfo;
 import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
@@ -78,7 +80,9 @@ public class TypeInfoTestCoverageTest extends TestLogger {
                         SortedMapTypeInfo.class.getName(),
                         ExternalTypeInfo.class.getName(),
                         BigDecimalTypeInfo.class.getName(),
-                        DecimalDataTypeInfo.class.getName());
+                        DecimalDataTypeInfo.class.getName(),
+                        GenericRecordAvroTypeInfo.class.getName(),
+                        AvroTypeInfo.class.getName());
 
         // check if a test exists for each type information
         for (Class<? extends TypeInformation> typeInfo : typeInfos) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
index 20b01b4df57..f71b60a08a8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
@@ -65,6 +65,7 @@ import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNode;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNodeSerializer;
 import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
+import org.apache.flink.formats.avro.typeutils.AvroSerializer;
 import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.heap.TestDuplicateSerializer;
 import org.apache.flink.runtime.state.ttl.TtlStateFactory;
@@ -195,7 +196,8 @@ public class TypeSerializerTestCoverageTest extends 
TestLogger {
                         RowDataSerializer.class.getName(),
                         DecimalDataSerializer.class.getName(),
                         
SharedBufferNode.SharedBufferNodeSerializer.class.getName(),
-                        NFA.NFASerializer.class.getName());
+                        NFA.NFASerializer.class.getName(),
+                        AvroSerializer.class.getName());
 
         //  type serializer whitelist for TypeSerializerUpgradeTestBase test 
coverage
         final List<String> typeSerializerUpgradeTestBaseWhitelist =
@@ -258,7 +260,8 @@ public class TypeSerializerTestCoverageTest extends 
TestLogger {
                         RowDataSerializer.class.getName(),
                         DecimalDataSerializer.class.getName(),
                         
SharedBufferNode.SharedBufferNodeSerializer.class.getName(),
-                        NFA.NFASerializer.class.getName());
+                        NFA.NFASerializer.class.getName(),
+                        AvroSerializer.class.getName());
 
         // check if a test exists for each type serializer
         for (Class<? extends TypeSerializer> typeSerializer : typeSerializers) 
{
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java
index 33668f6c5b2..206725c0154 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java
@@ -21,42 +21,46 @@ import 
org.apache.flink.client.deployment.executors.RemoteExecutor;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.io.InputStream;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Integration tests for {@link StreamExecutionEnvironment}. */
 public class StreamExecutionEnvironmentITCase {
 
-    // We use our own miniClusterResource because we wan't to connect to it 
using a remote executor.
-    @ClassRule
-    public static MiniClusterWithClientResource miniClusterResource =
-            new MiniClusterWithClientResource(
+    @RegisterExtension
+    public static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setNumberTaskManagers(1)
                             .setNumberSlotsPerTaskManager(1)
                             .build());
 
-    @Rule public ExpectedException thrown = ExpectedException.none();
-
     @Test
-    public void executeThrowsProgramInvocationException() throws Exception {
-        UnmodifiableConfiguration clientConfiguration =
-                miniClusterResource.getClientConfiguration();
-        Configuration config = new Configuration(clientConfiguration);
+    public void executeThrowsProgramInvocationException() {
+        Configuration config = new 
Configuration(MINI_CLUSTER.getClientConfiguration());
         config.set(DeploymentOptions.TARGET, RemoteExecutor.NAME);
         config.setBoolean(DeploymentOptions.ATTACHED, true);
 
         // Create the execution environment explicitly from a Configuration so 
we know that we
         // don't get some other subclass. If we just did
         // StreamExecutionEnvironment.getExecutionEnvironment() we would get a
-        // TestStreamEnvironment that the MiniClusterResource created. We want 
to test the behaviour
+        // TestStreamEnvironment that the MiniClusterExtension created. We 
want to test the
+        // behaviour
         // of the base environment, though.
         StreamExecutionEnvironment env = new 
StreamExecutionEnvironment(config);
 
@@ -67,7 +71,52 @@ public class StreamExecutionEnvironmentITCase {
                         })
                 .print();
 
-        thrown.expect(ProgramInvocationException.class);
-        env.execute();
+        
assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testAvroGenericRecordsInFromElementsDoesNotFailDueToKryoFallback() 
throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        Schema schema = getSchemaFromResources("/avro/user.avsc");
+        GenericRecord user1 =
+                new GenericRecordBuilder(schema).set("name", "Foo").set("age", 
40).build();
+        GenericRecord user2 =
+                new GenericRecordBuilder(schema).set("name", "Bar").set("age", 
45).build();
+        GenericRecord[] data = {user1, user2};
+        DataStream<GenericRecord> stream =
+                env.fromElements(new GenericRecordAvroTypeInfo(schema), data);
+
+        List<GenericRecord> result = stream.executeAndCollect(data.length + 1);
+
+        assertThat(result).containsExactly(data);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void 
testAvroGenericRecordsInFromElementsDoesNotFailDueToKryoFallbackUsingReturns()
+            throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        Schema schema = getSchemaFromResources("/avro/user.avsc");
+        GenericRecord user1 =
+                new GenericRecordBuilder(schema).set("name", "Foo").set("age", 
40).build();
+        GenericRecord user2 =
+                new GenericRecordBuilder(schema).set("name", "Bar").set("age", 
45).build();
+        GenericRecord[] data = {user1, user2};
+        DataStream<GenericRecord> stream =
+                env.fromElements(data).returns(new 
GenericRecordAvroTypeInfo(schema));
+
+        List<GenericRecord> result = stream.executeAndCollect(data.length + 1);
+
+        assertThat(result).containsExactly(data);
+    }
+
+    private Schema getSchemaFromResources(String path) throws Exception {
+        try (InputStream schemaStream = getClass().getResourceAsStream(path)) {
+            if (schemaStream == null) {
+                throw new IllegalStateException("Could not find " + path + " 
in classpath");
+            }
+            return new Schema.Parser().parse(schemaStream);
+        }
     }
 }
diff --git a/flink-tests/src/test/resources/avro/user.avsc 
b/flink-tests/src/test/resources/avro/user.avsc
new file mode 100644
index 00000000000..504a06b9340
--- /dev/null
+++ b/flink-tests/src/test/resources/avro/user.avsc
@@ -0,0 +1,9 @@
+{
+  "namespace": "org.apache.flink.connector.datagen.source.generated",
+  "type": "record",
+  "name": "User",
+  "fields": [
+    {"name": "name", "type": "string"},
+    {"name": "age", "type": "int"}
+  ]
+}
diff --git a/pom.xml b/pom.xml
index ec709084fc9..a35bdb6a0b5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1622,6 +1622,8 @@ under the License.
                                                
<exclude>flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/answer_set/*</exclude>
                                                
<exclude>flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/query/*</exclude>
                                                
<exclude>flink-table/flink-table-code-splitter/src/test/resources/**</exclude>
+                                               
<exclude>flink-tests/src/test/resources/avro/*.avsc</exclude>
+                                               
<exclude>flink-connectors/flink-connector-datagen-test/src/test/resources/avro/*.avsc</exclude>
 
                                                <!-- ArchUnit violation stores  
-->
                                                
<exclude>**/archunit-violations/**</exclude>
@@ -2294,6 +2296,7 @@ under the License.
                                                                
<exclude>@org.apache.flink.annotation.PublicEvolving</exclude>
                                                                
<exclude>@org.apache.flink.annotation.Internal</exclude>
                                                                <!-- MARKER: 
start exclusions; these will be wiped by 
tools/releasing/update_japicmp_configuration.sh -->
+                                                               
<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromElements(org.apache.flink.api.common.typeinfo.TypeInformation,java.lang.Object[])</exclude>
                                                                <!-- MARKER: 
end exclusions -->
                                                        </excludes>
                                                        
<accessModifier>public</accessModifier>

Reply via email to