This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ad64e72cf3eaede9a4db6313e72f22340a133b00 Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com> AuthorDate: Thu Jun 22 20:06:10 2023 +0200 [FLINK-28050][connectors] Introduce FLIP-27 alternative to StreamExecutionEnvironment#fromElements() --- .../e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5 | 8 +- .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e | 87 +++++---- .../flink-architecture-tests-production/pom.xml | 5 + flink-architecture-tests/pom.xml | 7 + .../6dae736c-6957-4d04-93bf-d7ebc5ca97ab | 0 .../f2b2daa5-8ca8-4a74-bb17-cb9f49e3e0e2 | 0 .../archunit-violations/stored.rules | 4 + .../flink-connector-datagen-test/pom.xml | 49 ++++- .../FromElementsGeneratorSourceITCase.java | 122 +++++++++++++ .../datagen/source/DataGeneratorSourceITCase.java | 56 +++++- .../src/test/resources/avro/user.avsc | 9 + .../functions/FromElementsGeneratorFunction.java | 200 +++++++++++++++++++++ .../datagen/source/DataGeneratorSource.java | 35 +++- .../api/operators}/OutputTypeConfigurable.java | 8 +- ...st_stream_execution_environment_completeness.py | 2 +- flink-streaming-java/pom.xml | 6 + .../streaming/api/datastream/DataStreamSource.java | 2 +- .../environment/StreamExecutionEnvironment.java | 110 ++++++++++++ .../api/operators/OutputTypeConfigurable.java | 49 ----- .../api/operators/SimpleOperatorFactory.java | 13 +- .../api/operators/SourceOperatorFactory.java | 9 +- .../apache/flink/streaming/api/DataStreamTest.java | 7 - .../StreamExecutionEnvironmentTest.java | 57 ++++-- .../api/graph/StreamGraphGeneratorTest.java | 19 +- .../api/graph/StreamingJobGraphGeneratorTest.java | 16 +- .../MultipleInputNodeCreationProcessorTest.java | 4 +- flink-tests/pom.xml | 45 ++++- .../completeness/TypeInfoTestCoverageTest.java | 6 +- .../TypeSerializerTestCoverageTest.java | 7 +- .../api/StreamExecutionEnvironmentITCase.java | 87 +++++++-- flink-tests/src/test/resources/avro/user.avsc | 9 + pom.xml | 3 + 32 files changed, 871 insertions(+), 170 deletions(-) diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5 b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5 index 1fea7816ed1..dbca0076986 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5 +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5 @@ -17,10 +17,10 @@ Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServi Method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl.revokeResourceManagerLeadership()> calls method <org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.revokeLeadership()> in (EmbeddedHaServicesWithLeadershipControl.java:102) Method <org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerImpl.listDataSets()> calls method <org.apache.flink.runtime.io.network.partition.DataSetMetaInfo.withNumRegisteredPartitions(int, int)> in (ResourceManagerPartitionTrackerImpl.java:286) Method <org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.closeAsync(long)> calls method <org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.quiesce()> in (RecreateOnResetOperatorCoordinator.java:361) -Method <org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfiguration(org.apache.flink.configuration.Configuration, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, java.lang.String, java.io.File)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, int)> in (TaskManagerConfiguration.java:244) -Method <org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfiguration(org.apache.flink.configuration.Configuration, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, java.lang.String, java.io.File)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)> in (TaskManagerConfiguration.java:246) -Method <org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, java.util.concurrent.Executor)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, int)> in (TaskManagerServices.java:433) -Method <org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, java.util.concurrent.Executor)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)> in (TaskManagerServices.java:431) +Method <org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfiguration(org.apache.flink.configuration.Configuration, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, java.lang.String, java.io.File)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, int)> in (TaskManagerConfiguration.java:241) +Method <org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration.fromConfiguration(org.apache.flink.configuration.Configuration, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, java.lang.String, java.io.File)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)> in (TaskManagerConfiguration.java:243) +Method <org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, java.util.concurrent.Executor)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, int)> in (TaskManagerServices.java:450) +Method <org.apache.flink.runtime.taskexecutor.TaskManagerServices.createTaskSlotTable(int, org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec, long, int, java.util.concurrent.Executor)> calls method <org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec)> in (TaskManagerServices.java:448) Method <org.apache.flink.streaming.api.operators.SourceOperator$1$1.asClassLoader()> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getRuntimeContext()> in (SourceOperator.java:295) Method <org.apache.flink.streaming.api.operators.SourceOperator$1$1.registerReleaseHookIfAbsent(java.lang.String, java.lang.Runnable)> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getRuntimeContext()> in (SourceOperator.java:301) Method <org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init()> calls method <org.apache.flink.streaming.api.operators.SourceOperator.getSourceReader()> in (SourceOperatorStreamTask.java:95) diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e index 0c69f24bc8b..95c466ee545 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e @@ -64,6 +64,18 @@ Constructor <org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher. Constructor <org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.<init>(int, org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue, org.apache.flink.connector.base.source.reader.splitreader.SplitReader, java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer, boolean)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (SplitFetcher.java:97) Constructor <org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.<init>(org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue, java.util.function.Supplier, org.apache.flink.configuration.Configuration, java.util.function.Consumer)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitFetcherManager.java:0) Constructor <org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.<init>(int)> calls method <org.apache.flink.util.Preconditions.checkArgument(boolean, java.lang.Object)> in (FutureCompletingBlockingQueue.java:114) +Constructor <org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.api.connector.source.SourceReaderFactory, org.apache.flink.connector.datagen.source.GeneratorFunction, long, org.apache.flink.api.common.typeinfo.TypeInformation)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (DataGeneratorSource.java:154) +Constructor <org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.api.connector.source.SourceReaderFactory, org.apache.flink.connector.datagen.source.GeneratorFunction, long, org.apache.flink.api.common.typeinfo.TypeInformation)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (DataGeneratorSource.java:156) +Constructor <org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.api.connector.source.SourceReaderFactory, org.apache.flink.connector.datagen.source.GeneratorFunction, long, org.apache.flink.api.common.typeinfo.TypeInformation)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (DataGeneratorSource.java:150) +Constructor <org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.api.connector.source.SourceReaderFactory, org.apache.flink.connector.datagen.source.GeneratorFunction, long, org.apache.flink.api.common.typeinfo.TypeInformation)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (DataGeneratorSource.java:151) +Constructor <org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.api.connector.source.SourceReaderFactory, org.apache.flink.connector.datagen.source.GeneratorFunction, long, org.apache.flink.api.common.typeinfo.TypeInformation)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (DataGeneratorSource.java:152) +Constructor <org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction, long, org.apache.flink.api.common.typeinfo.TypeInformation)> calls method <org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy.noOp()> in (DataGeneratorSource.java:120) +Constructor <org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction, long, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy, org.apache.flink.api.common.typeinfo.TypeInformation)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (DataGeneratorSource.java:141) +Constructor <org.apache.flink.connector.datagen.source.DataGeneratorSource.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction, long, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy, org.apache.flink.api.common.typeinfo.TypeInformation)> has parameter of type <org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy> in (DataGeneratorSource.java:0) +Constructor <org.apache.flink.connector.datagen.source.GeneratingIteratorSourceReader.<init>(org.apache.flink.api.connector.source.SourceReaderContext, org.apache.flink.connector.datagen.source.GeneratorFunction)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (GeneratingIteratorSourceReader.java:46) +Constructor <org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (GeneratorSourceReaderFactory.java:54) +Constructor <org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (GeneratorSourceReaderFactory.java:55) +Constructor <org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction, org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> has parameter of type <org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy> in (GeneratorSourceReaderFactory.java:0) Constructor <org.apache.flink.connector.datagen.table.DataGenTableSource.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;, java.lang.String, org.apache.flink.table.types.DataType, long, java.lang.Long)> depends on component type <org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in (DataGenTableSource.java:0) Constructor <org.apache.flink.connector.datagen.table.DataGenTableSource.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;, java.lang.String, org.apache.flink.table.types.DataType, long, java.lang.Long)> has parameter of type <[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in (DataGenTableSource.java:0) Constructor <org.apache.flink.connector.datagen.table.DataGenVisitorBase.<init>(java.lang.String, org.apache.flink.configuration.ReadableConfig)> calls constructor <org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.<init>()> in (DataGenVisitorBase.java:49) @@ -213,6 +225,7 @@ Constructor <org.apache.flink.connector.file.table.stream.compact.CompactBucketW Constructor <org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.<init>(org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter, org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter, org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter$1)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter> in (CompactBucketWriter.java:0) Constructor <org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.<init>(org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter, org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter, org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter$1)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter> in (CompactBucketWriter.java:0) Constructor <org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.<init>(org.apache.flink.table.connector.sink.DynamicTableSink$DataStructureConverter, java.lang.String, boolean)> calls constructor <org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.<init>(java.lang.String, boolean)> in (PrintTableSinkFactory.java:173) +Field <org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.rateLimiterStrategy> has type <org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy> in (GeneratorSourceReaderFactory.java:0) Field <org.apache.flink.connector.datagen.table.DataGenTableSource.fieldGenerators> depends on component type <org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in (DataGenTableSource.java:0) Field <org.apache.flink.connector.datagen.table.DataGenTableSource.fieldGenerators> has type <[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in (DataGenTableSource.java:0) Field <org.apache.flink.connector.datagen.table.DataGeneratorContainer.generator> has type <org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in (DataGeneratorContainer.java:0) @@ -274,16 +287,22 @@ Method <org.apache.flink.connector.base.source.hybrid.HybridSource$HybridSourceB Method <org.apache.flink.connector.base.source.hybrid.HybridSource$HybridSourceBuilder.addSource(org.apache.flink.connector.base.source.hybrid.HybridSource$SourceFactory, org.apache.flink.api.connector.source.Boundedness)> calls method <org.apache.flink.util.Preconditions.checkArgument(boolean, java.lang.Object)> in (HybridSource.java:242) Method <org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(java.util.List)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, [Ljava.lang.Object;)> in (HybridSourceReader.java:153) Method <org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(int)> calls method <org.apache.flink.util.Preconditions.checkArgument(boolean)> in (HybridSourceReader.java:206) -Method <org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext, int, int, int)> calls method <org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits.signalIntermediateNoMoreSplits(int)> in (HybridSourceSplitEnumerator.java:446) -Method <org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext, int, int, int)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, [Ljava.lang.Object;)> in (HybridSourceSplitEnumerator.java:438) -Method <org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext, int, int, int)> checks instanceof <org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits> in (HybridSourceSplitEnumerator.java:438) +Method <org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext, int, int, int)> calls method <org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits.signalIntermediateNoMoreSplits(int)> in (HybridSourceSplitEnumerator.java:447) +Method <org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext, int, int, int)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, [Ljava.lang.Object;)> in (HybridSourceSplitEnumerator.java:439) +Method <org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext, int, int, int)> checks instanceof <org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits> in (HybridSourceSplitEnumerator.java:439) Method <org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.handleSplitRequest(int, java.lang.String)> calls method <org.apache.flink.util.Preconditions.checkState(boolean)> in (HybridSourceSplitEnumerator.java:110) Method <org.apache.flink.connector.base.source.hybrid.SwitchedSources.put(int, org.apache.flink.api.connector.source.Source)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (SwitchedSources.java:48) Method <org.apache.flink.connector.base.source.hybrid.SwitchedSources.sourceOf(int)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String, [Ljava.lang.Object;)> in (SwitchedSources.java:38) Method <org.apache.flink.connector.base.source.reader.SourceReaderBase.moveToNextSplit(org.apache.flink.connector.base.source.reader.RecordsWithSplitIds, org.apache.flink.api.connector.source.ReaderOutput)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (SourceReaderBase.java:229) Method <org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.getNumAliveFetchers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitFetcherManager.java:0) Method <org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.take()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (FutureCompletingBlockingQueue.java:0) -Method <org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> calls constructor <org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.<init>(org.apache.flink.streaming.api.functions.source.datagen.DataGenerator, long, java.lang.Long)> in (DataGenTableSource.java:71) +Method <org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction.getSerializer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (FromElementsGeneratorFunction.java:0) +Method <org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction.serializeElements(java.lang.Iterable)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (FromElementsGeneratorFunction.java:97) +Method <org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction.setOutputType(org.apache.flink.api.common.typeinfo.TypeInformation, org.apache.flink.api.common.ExecutionConfig)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (FromElementsGeneratorFunction.java:151) +Method <org.apache.flink.connector.datagen.source.DataGeneratorSource.getGeneratorFunction()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (DataGeneratorSource.java:0) +Method <org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.createReader(org.apache.flink.api.connector.source.SourceReaderContext)> calls constructor <org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader.<init>(org.apache.flink.api.connector.source.SourceReader, org.apache.flink.api.connector.source.util.ratelimit.RateLimiter)> in (GeneratorSourceReaderFactory.java:63) +Method <org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.createReader(org.apache.flink.api.connector.source.SourceReaderContext)> calls method <org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy.createRateLimiter(int)> in (GeneratorSourceReaderFactory.java:62) +Method <org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> calls constructor <org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.<init>(org.apache.flink.streaming.api.functions.source.datagen.DataGenerator, long, java.lang.Long)> in (DataGenTableSource.java:67) Method <org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> has return type <org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource> in (DataGenTableSource.java:0) Method <org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (DataGenTableSource.java:0) Method <org.apache.flink.connector.datagen.table.DataGeneratorContainer.getGenerator()> has return type <org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in (DataGeneratorContainer.java:0) @@ -748,8 +767,8 @@ Method <org.apache.flink.connector.file.table.stream.PartitionCommitter.processE Method <org.apache.flink.connector.file.table.stream.PartitionCommitter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has generic parameter type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.connector.file.table.stream.PartitionCommitInfo>> with type argument depending on <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (PartitionCommitter.java:0) Method <org.apache.flink.connector.file.table.stream.PartitionCommitter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has parameter of type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (PartitionCommitter.java:0) Method <org.apache.flink.connector.file.table.stream.PartitionTimeCommitPredicate.isPartitionCommittable(org.apache.flink.connector.file.table.stream.PartitionCommitPredicate$PredicateContext)> calls method <org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues(org.apache.flink.core.fs.Path)> in (PartitionTimeCommitPredicate.java:71) -Method <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger$1.currentProcTime()> calls method <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()> in (ProcTimeCommitTrigger.java:110) -Method <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.addPartition(java.lang.String)> calls method <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()> in (ProcTimeCommitTrigger.java:75) +Method <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger$1.currentProcTime()> calls method <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()> in (ProcTimeCommitTrigger.java:111) +Method <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.addPartition(java.lang.String)> calls method <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()> in (ProcTimeCommitTrigger.java:76) Method <org.apache.flink.connector.file.table.stream.StreamingFileWriter.closePartFileForPartitions()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.closePartFileForBucket(java.lang.Object)> in (StreamingFileWriter.java:130) Method <org.apache.flink.connector.file.table.stream.StreamingFileWriter.closePartFileForPartitions()> calls method <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()> in (StreamingFileWriter.java:126) Method <org.apache.flink.connector.file.table.stream.StreamingFileWriter.commitUpToCheckpoint(long)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (StreamingFileWriter.java:154) @@ -765,14 +784,14 @@ Method <org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter Method <org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.commit()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.closeForCommit()> in (CompactBucketWriter.java:49) Method <org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.factory(org.apache.flink.util.function.SupplierWithException)> has generic parameter type <org.apache.flink.util.function.SupplierWithException<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<T, java.lang.String>, java.io.IOException>> with type argument depending on <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter> in (CompactBucketWriter.java:0) Method <org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.write(java.lang.Object)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.write(java.lang.Object, long)> in (CompactBucketWriter.java:44) -Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long, java.util.Map)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactCoordinator.java:185) -Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long, java.util.Map)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactCoordinator.java:193) -Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactCoordinator.java:115) -Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactCoordinator.java:115) -Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class, org.apache.flink.api.common.ExecutionConfig)> in (CompactCoordinator.java:115) -Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (CompactCoordinator.java:106) -Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in (CompactCoordinator.java:106) -Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (CompactCoordinator.java:126) +Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long, java.util.Map)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactCoordinator.java:186) +Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long, java.util.Map)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactCoordinator.java:194) +Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactCoordinator.java:116) +Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactCoordinator.java:116) +Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class, org.apache.flink.api.common.ExecutionConfig)> in (CompactCoordinator.java:116) +Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (CompactCoordinator.java:107) +Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in (CompactCoordinator.java:107) +Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (CompactCoordinator.java:127) Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has generic parameter type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.connector.file.table.stream.compact.CompactMessages$CoordinatorInput>> with type argument depending on <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (CompactCoordinator.java:0) Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has parameter of type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (CompactCoordinator.java:0) Method <org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.commitUpToCheckpoint(long)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactFileWriter.java:63) @@ -780,19 +799,19 @@ Method <org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.c Method <org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.commitUpToCheckpoint(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getNumberOfParallelSubtasks()> in (CompactFileWriter.java:63) Method <org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.onPartFileOpened(java.lang.String, org.apache.flink.core.fs.Path)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactFileWriter.java:52) Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.convertFromUncompacted(org.apache.flink.core.fs.Path)> calls method <org.apache.flink.util.Preconditions.checkArgument(boolean, java.lang.Object)> in (CompactOperator.java:211) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactOperator.java:161) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getIndexOfThisSubtask()> in (CompactOperator.java:159) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getNumberOfParallelSubtasks()> in (CompactOperator.java:160) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactOperator.java:108) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactOperator.java:108) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class, org.apache.flink.api.common.ExecutionConfig)> in (CompactOperator.java:108) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (CompactOperator.java:102) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()> in (CompactOperator.java:138) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo.getConfiguration()> in (CompactOperator.java:139) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getIndexOfThisSubtask()> in (CompactOperator.java:125) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getNumberOfParallelSubtasks()> in (CompactOperator.java:124) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (CompactOperator.java:120) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.tasks.StreamTask.getEnvironment()> in (CompactOperator.java:137) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactOperator.java:162) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getIndexOfThisSubtask()> in (CompactOperator.java:160) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getNumberOfParallelSubtasks()> in (CompactOperator.java:161) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactOperator.java:109) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactOperator.java:109) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class, org.apache.flink.api.common.ExecutionConfig)> in (CompactOperator.java:109) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (CompactOperator.java:103) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()> in (CompactOperator.java:139) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo.getConfiguration()> in (CompactOperator.java:140) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getIndexOfThisSubtask()> in (CompactOperator.java:126) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getNumberOfParallelSubtasks()> in (CompactOperator.java:125) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (CompactOperator.java:121) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.tasks.StreamTask.getEnvironment()> in (CompactOperator.java:138) Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has generic parameter type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.connector.file.table.stream.compact.CompactMessages$CoordinatorOutput>> with type argument depending on <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (CompactOperator.java:0) Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has parameter of type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (CompactOperator.java:0) Method <org.apache.flink.connector.file.table.utils.CompactFileUtils.doSingleFileMove(org.apache.flink.core.fs.FileSystem, org.apache.flink.core.fs.Path, org.apache.flink.core.fs.Path)> calls method <org.apache.flink.util.IOUtils.copyBytes(java.io.InputStream, java.io.OutputStream, boolean)> in (CompactFileUtils.java:117) @@ -807,10 +826,10 @@ Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCo Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()> calls method <org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory.getInstance()> in (StandardDeCompressors.java:43) Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()> calls method <org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.getInstance()> in (StandardDeCompressors.java:44) Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()> calls method <org.apache.flink.api.common.io.compression.XZInputStreamFactory.getInstance()> in (StandardDeCompressors.java:46) -Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (PartitionTimeCommitTrigger.java:51) -Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (PartitionTimeCommitTrigger.java:55) -Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (PartitionTimeCommitTrigger.java:55) -Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in (PartitionTimeCommitTrigger.java:51) -Static Initializer <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (ProcTimeCommitTrigger.java:46) -Static Initializer <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (ProcTimeCommitTrigger.java:46) -Static Initializer <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in (ProcTimeCommitTrigger.java:46) +Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (PartitionTimeCommitTrigger.java:52) +Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (PartitionTimeCommitTrigger.java:56) +Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (PartitionTimeCommitTrigger.java:56) +Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in (PartitionTimeCommitTrigger.java:52) +Static Initializer <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (ProcTimeCommitTrigger.java:47) +Static Initializer <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (ProcTimeCommitTrigger.java:47) +Static Initializer <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in (ProcTimeCommitTrigger.java:47) \ No newline at end of file diff --git a/flink-architecture-tests/flink-architecture-tests-production/pom.xml b/flink-architecture-tests/flink-architecture-tests-production/pom.xml index fc0bd3c1046..4fe46212390 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/pom.xml +++ b/flink-architecture-tests/flink-architecture-tests-production/pom.xml @@ -138,6 +138,11 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-datagen</artifactId> + </dependency> </dependencies> <build> diff --git a/flink-architecture-tests/pom.xml b/flink-architecture-tests/pom.xml index 34ffe550245..5b72c68b25e 100644 --- a/flink-architecture-tests/pom.xml +++ b/flink-architecture-tests/pom.xml @@ -153,6 +153,13 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-datagen</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <!-- Test Utils --> <dependency> diff --git a/flink-connectors/flink-connector-datagen-test/archunit-violations/6dae736c-6957-4d04-93bf-d7ebc5ca97ab b/flink-connectors/flink-connector-datagen-test/archunit-violations/6dae736c-6957-4d04-93bf-d7ebc5ca97ab new file mode 100644 index 00000000000..e69de29bb2d diff --git a/flink-connectors/flink-connector-datagen-test/archunit-violations/f2b2daa5-8ca8-4a74-bb17-cb9f49e3e0e2 b/flink-connectors/flink-connector-datagen-test/archunit-violations/f2b2daa5-8ca8-4a74-bb17-cb9f49e3e0e2 new file mode 100644 index 00000000000..e69de29bb2d diff --git a/flink-connectors/flink-connector-datagen-test/archunit-violations/stored.rules b/flink-connectors/flink-connector-datagen-test/archunit-violations/stored.rules new file mode 100644 index 00000000000..551a540797d --- /dev/null +++ b/flink-connectors/flink-connector-datagen-test/archunit-violations/stored.rules @@ -0,0 +1,4 @@ +# +#Sat Oct 14 00:17:16 CEST 2023 +ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=6dae736c-6957-4d04-93bf-d7ebc5ca97ab +Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=f2b2daa5-8ca8-4a74-bb17-cb9f49e3e0e2 diff --git a/flink-connectors/flink-connector-datagen-test/pom.xml b/flink-connectors/flink-connector-datagen-test/pom.xml index 5b89683ba20..20bdb4c6d16 100644 --- a/flink-connectors/flink-connector-datagen-test/pom.xml +++ b/flink-connectors/flink-connector-datagen-test/pom.xml @@ -29,7 +29,7 @@ </parent> <artifactId>flink-connector-datagen-tests</artifactId> - <name>Flink : Connectors : Datagen</name> + <name>Flink : Connectors : Datagen Tests</name> <packaging>jar</packaging> @@ -71,6 +71,13 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <!-- ArchUit test dependencies --> <dependency> @@ -91,4 +98,44 @@ </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <version>${avro.version}</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory> + <testOutputDirectory>${project.basedir}/target/generated-test-sources/avro</testOutputDirectory> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-avro-test-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>${project.build.directory}/generated-test-sources/avro</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </project> diff --git a/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorSourceITCase.java b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorSourceITCase.java new file mode 100644 index 00000000000..4c0df21de4b --- /dev/null +++ b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorSourceITCase.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.generated.User; +import org.apache.flink.formats.avro.typeutils.AvroSerializer; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLogger; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.List; +import java.util.NoSuchElementException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Integration tests for {@code FromElementsGeneratorSourceITCase}. */ +class FromElementsGeneratorSourceITCase extends TestLogger { + + private static final int PARALLELISM = 1; + + @RegisterExtension + private static final MiniClusterExtension miniClusterExtension = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .build()); + + // ------------------------------------------------------------------------ + + @Test + @DisplayName("Produces expected String output") + void testBasicType() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + String[] data = {"Foo", "bar", "baz"}; + FromElementsGeneratorFunction<String> generatorFunction = + new FromElementsGeneratorFunction<>(Types.STRING, "Foo", "bar", "baz"); + DataGeneratorSource<String> dataGeneratorSource = + new DataGeneratorSource<>(generatorFunction, data.length, Types.STRING); + DataStream<String> stream = + env.fromSource( + dataGeneratorSource, WatermarkStrategy.noWatermarks(), "generator source"); + + List<String> result = stream.executeAndCollect(data.length + 1); + TypeSerializer<String> serializer = generatorFunction.getSerializer(); + + assertThat(serializer).isEqualTo(Types.STRING.createSerializer(new ExecutionConfig())); + assertThat(result).containsExactly(data); + } + + @Test + @DisplayName("Handles Avro data correctly") + void testAvroType() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + User user1 = new User("Foo", 1); + User user2 = new User("Bar", 2); + User[] data = {user1, user2}; + FromElementsGeneratorFunction<User> generatorFunction = + new FromElementsGeneratorFunction<>(TypeExtractor.createTypeInfo(User.class), data); + DataGeneratorSource<User> dataGeneratorSource = + new DataGeneratorSource<>( + generatorFunction, data.length, new AvroTypeInfo<>(User.class)); + DataStream<User> stream = + env.fromSource( + dataGeneratorSource, WatermarkStrategy.noWatermarks(), "generator source"); + + List<User> result = stream.executeAndCollect(data.length + 1); + TypeSerializer<User> serializer = generatorFunction.getSerializer(); + + assertThat(serializer).isInstanceOf(AvroSerializer.class); + assertThat(result).containsExactly(data); + } + + @Test + @DisplayName("Test exception when more elements are requested than available") + void testMoreElementsRequestedThanAvailable() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + String[] data = {"foo", "bar"}; + FromElementsGeneratorFunction<String> generatorFunction = + new FromElementsGeneratorFunction<>(Types.STRING, data); + DataGeneratorSource<String> dataGeneratorSource = + new DataGeneratorSource<>(generatorFunction, data.length + 1, Types.STRING); + DataStream<String> stream = + env.fromSource( + dataGeneratorSource, WatermarkStrategy.noWatermarks(), "generator source"); + + assertThatThrownBy(() -> stream.executeAndCollect(data.length + 1)) + .hasRootCauseInstanceOf(NoSuchElementException.class); + } +} diff --git a/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java index 842982ac584..2aa1468caea 100644 --- a/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java +++ b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java @@ -18,8 +18,11 @@ package org.apache.flink.connector.datagen.source; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; @@ -30,6 +33,8 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -200,6 +205,55 @@ class DataGeneratorSourceITCase extends TestLogger { assertThat(results).hasSize(capacityPerCheckpoint); } + @Test + @DisplayName("Stream returns() call is propagated correctly.") + @SuppressWarnings("unchecked") + void testReturnsIsCorrectlyPropagated() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + OutputTypeConfigurableGeneratorFunction generatorFunction = + new OutputTypeConfigurableGeneratorFunction(BasicTypeInfo.STRING_TYPE_INFO); + + final DataStreamSource<Long> streamSource = + getGeneratorSourceStream(generatorFunction, env, 1); + streamSource.returns(BasicTypeInfo.LONG_TYPE_INFO).executeAndCollect(1); + + DataGeneratorSource<Long> dataGeneratorSource = + (DataGeneratorSource<Long>) + ((SourceTransformation<Long, ?, ?>) streamSource.getTransformation()) + .getSource(); + + assertThat(dataGeneratorSource.getProducedType()).isEqualTo(BasicTypeInfo.LONG_TYPE_INFO); + assertThat(generatorFunction.getTypeInformation()).isEqualTo(BasicTypeInfo.LONG_TYPE_INFO); + } + + @SuppressWarnings("rawtypes") + private static class OutputTypeConfigurableGeneratorFunction + implements GeneratorFunction<Long, Long>, OutputTypeConfigurable<Long> { + + public OutputTypeConfigurableGeneratorFunction(TypeInformation typeInformation) { + this.typeInformation = typeInformation; + } + + private TypeInformation typeInformation; + + @Override + public Long map(Long value) throws Exception { + return 0L; + } + + @Override + public void setOutputType( + TypeInformation<Long> outTypeInfo, ExecutionConfig executionConfig) { + typeInformation = outTypeInfo; + } + + public TypeInformation getTypeInformation() { + return typeInformation; + } + } + private static class FirstCheckpointFilter implements FlatMapFunction<Long, Long>, CheckpointedFunction { @@ -221,7 +275,7 @@ class DataGeneratorSourceITCase extends TestLogger { public void initializeState(FunctionInitializationContext context) throws Exception {} } - private DataStream<Long> getGeneratorSourceStream( + private DataStreamSource<Long> getGeneratorSourceStream( GeneratorFunction<Long, Long> generatorFunction, StreamExecutionEnvironment env, long count) { diff --git a/flink-connectors/flink-connector-datagen-test/src/test/resources/avro/user.avsc b/flink-connectors/flink-connector-datagen-test/src/test/resources/avro/user.avsc new file mode 100644 index 00000000000..504a06b9340 --- /dev/null +++ b/flink-connectors/flink-connector-datagen-test/src/test/resources/avro/user.avsc @@ -0,0 +1,9 @@ +{ + "namespace": "org.apache.flink.connector.datagen.source.generated", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"} + ] +} diff --git a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java new file mode 100644 index 00000000000..1acd4f8de0c --- /dev/null +++ b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import java.util.NoSuchElementException; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * <p>This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * @param <OUT> The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction<OUT> + implements GeneratorFunction<Long, OUT>, OutputTypeConfigurable<OUT> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + + /** The (de)serializer to be used for the data elements. */ + private @Nullable TypeSerializer<OUT> serializer; + + /** The actual data elements, in serialized form. */ + private byte[] elementsSerialized; + + /** The number of elements emitted already. */ + private int numElementsEmitted; + + private transient Iterable<OUT> elements; + private transient DataInputView input; + + @SafeVarargs + public FromElementsGeneratorFunction(TypeInformation<OUT> typeInfo, OUT... elements) { + this(typeInfo, new ExecutionConfig(), Arrays.asList(elements)); + } + + public FromElementsGeneratorFunction( + TypeInformation<OUT> typeInfo, ExecutionConfig config, Iterable<OUT> elements) { + // must not have null elements and mixed elements + checkIterable(elements, typeInfo.getTypeClass()); + this.serializer = typeInfo.createSerializer(config); + this.elements = elements; + trySerialize(elements); + } + + @VisibleForTesting + @Nullable + public TypeSerializer<OUT> getSerializer() { + return serializer; + } + + private void serializeElements(Iterable<OUT> elements) throws IOException { + Preconditions.checkState(serializer != null, "serializer not set"); + LOG.info("Serializing elements using " + serializer); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + + try { + for (OUT element : elements) { + serializer.serialize(element, wrapper); + } + } catch (Exception e) { + throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); + } + this.elementsSerialized = baos.toByteArray(); + } + + @Override + public void open(SourceReaderContext readerContext) throws Exception { + ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized); + this.input = new DataInputViewStreamWrapper(bais); + } + + @Override + public OUT map(Long nextIndex) throws Exception { + // Move iterator to the required position in case of failure recovery + while (numElementsEmitted < nextIndex) { + numElementsEmitted++; + tryDeserialize(serializer, input); + } + numElementsEmitted++; + return tryDeserialize(serializer, input); + } + + private OUT tryDeserialize(TypeSerializer<OUT> serializer, DataInputView input) + throws IOException { + try { + return serializer.deserialize(input); + } catch (EOFException eof) { + throw new NoSuchElementException( + "Reached the end of the collection. This could be caused by issues with the serializer or by calling the map() function more times than there are elements in the collection. Make sure that you set the number of records to be produced by the DataGeneratorSource equal to the number of elements in the collection."); + } catch (Exception e) { + throw new IOException( + "Failed to deserialize an element from the source. " + + "If you are using user-defined serialization (Value and Writable types), check the " + + "serialization functions.\nSerializer is " + + serializer, + e); + } + } + + // For backward compatibility: Supports legacy usage of + // StreamExecutionEnvironment#fromElements() which lacked type information and relied on the + // returns() method. See FLINK-21386 for details. + @Override + public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) { + Preconditions.checkState( + elements != null, + "The output type should've been specified before shipping the graph to the cluster"); + checkIterable(elements, outTypeInfo.getTypeClass()); + TypeSerializer<OUT> newSerializer = outTypeInfo.createSerializer(executionConfig); + if (Objects.equals(serializer, newSerializer)) { + return; + } + serializer = newSerializer; + try { + serializeElements(elements); + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + private void trySerialize(Iterable<OUT> elements) { + try { + serializeElements(elements); + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Verifies that all elements in the iterable are non-null, and are of the given class, or a + * subclass thereof. + * + * @param elements The iterable to check. + * @param viewedAs The class to which the elements must be assignable to. + * @param <OUT> The generic type of the iterable to be checked. + */ + public static <OUT> void checkIterable(Iterable<OUT> elements, Class<?> viewedAs) { + for (OUT elem : elements) { + if (elem == null) { + throw new IllegalArgumentException("The collection contains a null element"); + } + + if (!viewedAs.isAssignableFrom(elem.getClass())) { + throw new IllegalArgumentException( + "The elements in the collection are not all subclasses of " + + viewedAs.getCanonicalName()); + } + } + } +} diff --git a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java index 8f128414efb..a344eb635ad 100644 --- a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java +++ b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.datagen.source; import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; @@ -34,6 +35,7 @@ import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import java.util.Collection; @@ -92,15 +94,18 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Experimental public class DataGeneratorSource<OUT> implements Source<OUT, NumberSequenceSplit, Collection<NumberSequenceSplit>>, - ResultTypeQueryable<OUT> { + ResultTypeQueryable<OUT>, + OutputTypeConfigurable<OUT> { private static final long serialVersionUID = 1L; private final SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory; - private final TypeInformation<OUT> typeInfo; + private TypeInformation<OUT> typeInfo; private final NumberSequenceSource numberSource; + private final GeneratorFunction<Long, OUT> generatorFunction; + /** * Instantiates a new {@code DataGeneratorSource}. * @@ -130,23 +135,41 @@ public class DataGeneratorSource<OUT> TypeInformation<OUT> typeInfo) { this( new GeneratorSourceReaderFactory<>(generatorFunction, rateLimiterStrategy), + generatorFunction, count, typeInfo); - ClosureCleaner.clean( - generatorFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); ClosureCleaner.clean( rateLimiterStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); } private DataGeneratorSource( SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory, + GeneratorFunction<Long, OUT> generatorFunction, long count, TypeInformation<OUT> typeInfo) { this.sourceReaderFactory = checkNotNull(sourceReaderFactory); - ClosureCleaner.clean( - sourceReaderFactory, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + this.generatorFunction = checkNotNull(generatorFunction); this.typeInfo = checkNotNull(typeInfo); this.numberSource = new NumberSequenceSource(0, count - 1); + ClosureCleaner.clean( + generatorFunction, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean( + sourceReaderFactory, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + } + + @Override + @SuppressWarnings("unchecked") + public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) { + this.typeInfo = outTypeInfo; + if (generatorFunction instanceof OutputTypeConfigurable) { + ((OutputTypeConfigurable<OUT>) generatorFunction) + .setOutputType(outTypeInfo, executionConfig); + } + } + + @VisibleForTesting + public GeneratorFunction<Long, OUT> getGeneratorFunction() { + return generatorFunction; } // ------------------------------------------------------------------------ diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/OutputTypeConfigurable.java b/flink-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java similarity index 82% rename from flink-core/src/main/java/org/apache/flink/api/java/typeutils/OutputTypeConfigurable.java rename to flink-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java index 2ff3ceabcd6..d2850706397 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/OutputTypeConfigurable.java +++ b/flink-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.api.java.typeutils; +package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; @@ -27,6 +27,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; * at {@code org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for * cases where the output type is specified by the returns method and, thus, after the stream * operator has been created. + * + * <p>NOTE: this class was moved from flink-streaming-java to the same package in flink-core. This + * was required in order to avoid cyclic dependencies for internal connectors, such as + * flink-connector-datagen that is used in flink-streaming-java but also relies on {@code + * OutputTypeConfigurable}. Since flink-core is the dependency of flink-streaming-java, this does + * not introduce breaking changes. */ @PublicEvolving public interface OutputTypeConfigurable<OUT> { diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py index df3453dd730..eb1e38651d6 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py @@ -41,7 +41,7 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase, # is deprecated, exclude them. return {'getLastJobExecutionResult', 'getId', 'getIdString', 'createCollectionsEnvironment', 'createLocalEnvironment', - 'createRemoteEnvironment', 'addOperator', 'fromElements', + 'createRemoteEnvironment', 'addOperator', 'fromElements', 'fromData', 'resetContextEnvironment', 'getCachedFiles', 'generateSequence', 'getNumberOfExecutionRetries', 'getStreamGraph', 'fromParallelCollection', 'readFileStream', 'isForceCheckpointing', 'readFile', 'clean', diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml index 4a7644dfb45..542e05141fb 100644 --- a/flink-streaming-java/pom.xml +++ b/flink-streaming-java/pom.xml @@ -88,6 +88,12 @@ under the License. <version>3.6.1</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-datagen</artifactId> + <version>${project.version}</version> + </dependency> + <!-- test dependencies --> <dependency> diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java index e297f86f6c0..5e1a9ad59a1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java @@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.transformations.SourceTransformation; @Public public class DataStreamSource<T> extends SingleOutputStreamOperator<T> { - private final boolean isParallel; + private boolean isParallel; public DataStreamSource( StreamExecutionEnvironment environment, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 343308edcf2..7ce26a89d93 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -62,6 +62,8 @@ import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.core.execution.CacheSupportedPipelineExecutor; import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.DetachedJobExecutionResult; @@ -1120,6 +1122,114 @@ public class StreamExecutionEnvironment implements AutoCloseable { // Data stream creations // -------------------------------------------------------------------------------------------- + /** + * Creates a new data stream that contains the given elements. The elements must all be of the + * same type, for example, all of the {@link String} or {@link Integer}. + * + * <p>The framework will try and determine the exact type from the elements. In case of generic + * elements, it may be necessary to manually supply the type information via {@link + * #fromData(org.apache.flink.api.common.typeinfo.TypeInformation, OUT...)}. + * + * <p>NOTE: This creates a non-parallel data stream source by default (parallelism of one). + * Adjustment of parallelism is supported via {@code setParallelism()} on the result. + * + * @param data The array of elements to create the data stream from. + * @param <OUT> The type of the returned data stream + * @return The data stream representing the given array of elements + */ + @SafeVarargs + public final <OUT> DataStreamSource<OUT> fromData(OUT... data) { + if (data.length == 0) { + throw new IllegalArgumentException( + "fromElements needs at least one element as argument"); + } + + TypeInformation<OUT> typeInfo; + try { + typeInfo = TypeExtractor.getForObject(data[0]); + } catch (Exception e) { + throw new RuntimeException( + "Could not create TypeInformation for type " + + data[0].getClass().getName() + + "; please specify the TypeInformation manually via " + + "StreamExecutionEnvironment#fromData(Collection, TypeInformation)", + e); + } + return fromData(Arrays.asList(data), typeInfo); + } + + /** + * Creates a new data stream that contains the given elements. The elements should be the same + * or be the subclass to the {@code typeInfo} type. The sequence of elements must not be empty. + * + * <p>NOTE: This creates a non-parallel data stream source by default (parallelism of one). + * Adjustment of parallelism is supported via {@code setParallelism()} on the result. + * + * @param typeInfo The type information of the elements. + * @param data The array of elements to create the data stream from. + * @param <OUT> The type of the returned data stream + * @return The data stream representing the given array of elements + */ + @SafeVarargs + public final <OUT> DataStreamSource<OUT> fromData(TypeInformation<OUT> typeInfo, OUT... data) { + if (data.length == 0) { + throw new IllegalArgumentException( + "fromElements needs at least one element as argument"); + } + return fromData(Arrays.asList(data), typeInfo); + } + + private <OUT> DataStreamSource<OUT> fromData( + Collection<OUT> data, TypeInformation<OUT> typeInfo) { + Preconditions.checkNotNull(data, "Collection must not be null"); + + FromElementsGeneratorFunction<OUT> generatorFunction = + new FromElementsGeneratorFunction<>(typeInfo, getConfig(), data); + + DataGeneratorSource<OUT> generatorSource = + new DataGeneratorSource<>(generatorFunction, data.size(), typeInfo); + + return fromSource( + generatorSource, + WatermarkStrategy.forMonotonousTimestamps(), + "Collection Source") + .setParallelism(1); + } + + /** + * Creates a new data stream that contains the given elements. The framework will determine the + * type according to the based type user supplied. The elements should be the same or be the + * subclass to the based type. The sequence of elements must not be empty. + * + * <p>NOTE: This creates a non-parallel data stream source by default (parallelism of one). + * Adjustment of parallelism is supported via {@code setParallelism()} on the result. + * + * @param type The based class type in the collection. + * @param data The array of elements to create the data stream from. + * @param <OUT> The type of the returned data stream + * @return The data stream representing the given array of elements + */ + @SafeVarargs + public final <OUT> DataStreamSource<OUT> fromData(Class<OUT> type, OUT... data) { + if (data.length == 0) { + throw new IllegalArgumentException( + "fromElements needs at least one element as argument"); + } + + TypeInformation<OUT> typeInfo; + try { + typeInfo = TypeExtractor.getForClass(type); + } catch (Exception e) { + throw new RuntimeException( + "Could not create TypeInformation for type " + + type.getName() + + "; please specify the TypeInformation manually via " + + "StreamExecutionEnvironment#fromData(Collection, TypeInformation)", + e); + } + return fromData(Arrays.asList(data), typeInfo); + } + /** * Creates a new data stream that contains a sequence of numbers. This is a parallel source, if * you manually set the parallelism to {@code 1} (using {@link diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java deleted file mode 100644 index f358aea0c92..00000000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; - -/** - * Stream operators can implement this interface if they need access to the output type information - * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for - * cases where the output type is specified by the returns method and, thus, after the stream - * operator has been created. - * - * @deprecated Use {@link org.apache.flink.api.java.typeutils.OutputTypeConfigurable} instead - */ -@Deprecated -@PublicEvolving -public interface OutputTypeConfigurable<OUT> { - - /** - * Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, - * String, StreamOperator, TypeInformation, TypeInformation, String)} method when the {@link - * org.apache.flink.streaming.api.graph.StreamGraph} is generated. The method is called with the - * output {@link TypeInformation} which is also used for the {@link - * org.apache.flink.streaming.runtime.tasks.StreamTask} output serializer. - * - * @param outTypeInfo Output type information of the {@link - * org.apache.flink.streaming.runtime.tasks.StreamTask} - * @param executionConfig Execution configuration - */ - void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig); -} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java index fbb734eb07e..95329baaef4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; -import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction; import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; @@ -110,21 +109,13 @@ public class SimpleOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU @Override public boolean isOutputTypeConfigurable() { - return operator instanceof OutputTypeConfigurable - || operator // legacy kept for compatibility purposes - instanceof org.apache.flink.streaming.api.operators.OutputTypeConfigurable; + return operator instanceof OutputTypeConfigurable; } @SuppressWarnings("unchecked") @Override public void setOutputType(TypeInformation<OUT> type, ExecutionConfig executionConfig) { - if (operator instanceof OutputTypeConfigurable) { - ((OutputTypeConfigurable<OUT>) operator).setOutputType(type, executionConfig); - } else if (operator // legacy kept for compatibility purposes - instanceof org.apache.flink.streaming.api.operators.OutputTypeConfigurable) { - ((org.apache.flink.streaming.api.operators.OutputTypeConfigurable<OUT>) operator) - .setOutputType(type, executionConfig); - } + ((OutputTypeConfigurable<OUT>) operator).setOutputType(type, executionConfig); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java index 247cb456d88..8ec3f3c9a9d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java @@ -26,7 +26,6 @@ import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; -import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -158,9 +157,7 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU @Override public boolean isOutputTypeConfigurable() { - return source instanceof OutputTypeConfigurable - || source // legacy kept for compatibility purposes - instanceof org.apache.flink.streaming.api.operators.OutputTypeConfigurable; + return source instanceof OutputTypeConfigurable; } @Override @@ -168,10 +165,6 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU public void setOutputType(TypeInformation<OUT> type, ExecutionConfig executionConfig) { if (source instanceof OutputTypeConfigurable) { ((OutputTypeConfigurable<OUT>) source).setOutputType(type, executionConfig); - } else if (source // legacy kept for compatibility purposes - instanceof org.apache.flink.streaming.api.operators.OutputTypeConfigurable) { - ((org.apache.flink.streaming.api.operators.OutputTypeConfigurable<OUT>) source) - .setOutputType(type, executionConfig); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 0061ead05be..721f01b1fd7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -657,13 +657,6 @@ public class DataStreamTest extends TestLogger { .getStreamNode(sink.getTransformation().getId()) .getParallelism()); - try { - src.setParallelism(3); - fail(); - } catch (IllegalArgumentException success) { - // do nothing - } - DataStreamSource<Long> parallelSource = env.generateSequence(0, 0); parallelSource.sinkTo(new DiscardingSink<Long>()); assertEquals(7, getStreamGraph(env).getStreamNode(parallelSource.getId()).getParallelism()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java index f04ce7a7883..d6cf08c7cc0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java @@ -23,12 +23,14 @@ import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; @@ -46,6 +48,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.apache.flink.util.SplittableIterator; @@ -89,10 +92,9 @@ class StreamExecutionEnvironmentTest { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.fromElements("a", "b"); - FromElementsFunction<String> elementsFunction = - (FromElementsFunction<String>) getFunctionFromDataSource(source); - assertThat(elementsFunction.getSerializer()) - .isEqualTo(BasicTypeInfo.STRING_TYPE_INFO.createSerializer(env.getConfig())); + DataGeneratorSource<String> generatorSource = getSourceFromStream(source); + + assertThat(generatorSource.getProducedType()).isEqualTo(BasicTypeInfo.STRING_TYPE_INFO); } @Test @@ -103,12 +105,33 @@ class StreamExecutionEnvironmentTest { source.returns(customType); - FromElementsFunction<String> elementsFunction = - (FromElementsFunction<String>) getFunctionFromDataSource(source); - assertThat(elementsFunction.getSerializer()) - .isNotEqualTo(BasicTypeInfo.STRING_TYPE_INFO.createSerializer(env.getConfig())); - assertThat(elementsFunction.getSerializer()) - .isEqualTo(customType.createSerializer(env.getConfig())); + DataGeneratorSource<String> generatorSource = getSourceFromStream(source); + source.sinkTo(new DiscardingSink<>()); + env.getStreamGraph(); + + assertThat(generatorSource.getProducedType()).isNotEqualTo(BasicTypeInfo.STRING_TYPE_INFO); + assertThat(generatorSource.getProducedType()).isEqualTo(customType); + } + + @Test + @SuppressWarnings({"unchecked", "rawtypes"}) + void testFromElementsPostConstructionTypeIncompatible() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<String> source = env.fromElements("a", "b"); + source.returns((TypeInformation) BasicTypeInfo.INT_TYPE_INFO); + source.sinkTo(new DiscardingSink<>()); + + assertThatThrownBy(env::getStreamGraph) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("not all subclasses of java.lang.Integer"); + } + + @Test + void testFromElementsNullElement() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + assertThatThrownBy(() -> env.fromElements("a", null, "c")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("contains a null element"); } @Test @@ -171,7 +194,7 @@ class StreamExecutionEnvironmentTest { assertThat(getFunctionFromDataSource(src2)).isInstanceOf(StatefulSequenceSource.class); DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L); - assertThat(getFunctionFromDataSource(src3)).isInstanceOf(FromElementsFunction.class); + assertThat(getSourceFromDataSourceTyped(src3)).isInstanceOf(DataGeneratorSource.class); DataStreamSource<Long> src4 = env.fromCollection(list); assertThat(getFunctionFromDataSource(src4)).isInstanceOf(FromElementsFunction.class); @@ -511,6 +534,18 @@ class StreamExecutionEnvironmentTest { return (SourceFunction<T>) operator.getUserFunction(); } + @SuppressWarnings("unchecked") + private static <T, S extends Source<T, ?, ?>> S getSourceFromStream(DataStream<T> stream) { + return (S) ((SourceTransformation<T, ?, ?>) stream.getTransformation()).getSource(); + } + + private static <T> Source<T, ?, ?> getSourceFromDataSourceTyped( + DataStreamSource<T> dataStreamSource) { + dataStreamSource.sinkTo(new DiscardingSink<>()); + dataStreamSource.getExecutionEnvironment().getStreamGraph(); + return ((SourceTransformation<T, ?, ?>) dataStreamSource.getTransformation()).getSource(); + } + private static class DummySplittableIterator<T> extends SplittableIterator<T> { private static final long serialVersionUID = 1312752876092210499L; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index b4b51053459..c97d1efa5a0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -52,10 +52,10 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.operators.SourceOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; -import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.transformations.CacheTransformation; import org.apache.flink.streaming.api.transformations.MultipleInputTransformation; @@ -158,7 +158,8 @@ public class StreamGraphGeneratorTest extends TestLogger { Assertions.assertThat(node.getBufferTimeout()).isEqualTo(77L); break; default: - Assertions.assertThat(node.getOperator()).isInstanceOf(StreamSource.class); + Assertions.assertThat(node.getOperatorFactory()) + .isInstanceOf(SourceOperatorFactory.class); } } } @@ -566,15 +567,15 @@ public class StreamGraphGeneratorTest extends TestLogger { int maxParallelism = 42; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream<Integer> input1 = env.fromElements(1, 2, 3, 4).setMaxParallelism(128); - DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4).setMaxParallelism(129); + DataStream<Long> input1 = env.fromSequence(1, 4).setMaxParallelism(128); + DataStream<Long> input2 = env.fromSequence(1, 4).setMaxParallelism(129); env.getConfig().setMaxParallelism(maxParallelism); - DataStream<Integer> keyedResult = + DataStream<Long> keyedResult = input1.connect(input2) .keyBy(value -> value, value -> value) - .map(new NoOpIntCoMap()); + .map(new NoOpLongCoMap()); keyedResult.sinkTo(new DiscardingSink<>()); @@ -1151,14 +1152,14 @@ public class StreamGraphGeneratorTest extends TestLogger { } } - static class NoOpIntCoMap implements CoMapFunction<Integer, Integer, Integer> { + static class NoOpLongCoMap implements CoMapFunction<Long, Long, Long> { private static final long serialVersionUID = 1886595528149124270L; - public Integer map1(Integer value) throws Exception { + public Long map1(Long value) throws Exception { return value; } - public Integer map2(Integer value) throws Exception { + public Long map2(Long value) throws Exception { return value; } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index e4c3bdf133e..22b14f63afb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -85,6 +85,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -1113,9 +1114,10 @@ class StreamingJobGraphGeneratorTest { @Test void testYieldingOperatorNotChainableToTaskChainedToLegacySource() { + // TODO: this test can be removed when the legacy SourceFunction API gets removed StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1); - chainEnv.fromElements(1) + chainEnv.addSource(new LegacySource()) .map((x) -> x) // not chainable because of YieldingOperatorFactory and legacy source .transform( @@ -1161,9 +1163,10 @@ class StreamingJobGraphGeneratorTest { */ @Test void testYieldingOperatorProperlyChainedOnLegacySources() { + // TODO: this test can be removed when the legacy SourceFunction API gets removed StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1); - chainEnv.fromElements(1) + chainEnv.addSource(new LegacySource()) .map((x) -> x) // should automatically break chain here .transform("test", BasicTypeInfo.INT_TYPE_INFO, new YieldingTestOperatorFactory<>()) @@ -2652,4 +2655,13 @@ class StreamingJobGraphGeneratorTest { return new HashSet<>(completedClusterDatasetIds); } } + + private static class LegacySource implements SourceFunction<Integer> { + public void run(SourceContext<Integer> sourceContext) { + sourceContext.collect(1); + } + + @Override + public void cancel() {} + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java index 31b7ebf7273..6caf9a5a3aa 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test; import java.io.File; import java.io.IOException; +import java.util.Arrays; import static org.assertj.core.api.Assertions.assertThat; @@ -124,7 +125,8 @@ class MultipleInputNodeCreationProcessorTest extends TableTestBase { } private void createNonChainableStream(TableTestUtil util) { - DataStreamSource<Integer> dataStream = util.getStreamEnv().fromElements(1, 2, 3); + DataStreamSource<Integer> dataStream = + util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3)); TableTestUtil.createTemporaryView( util.tableEnv(), "nonChainableStream", diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 89114013722..d1c26ecfe68 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -284,6 +284,13 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <!-- For UpsertTestDynamicTableSinkITCase --> <dependency> <groupId>org.apache.flink</groupId> @@ -305,7 +312,7 @@ under the License. <artifactId>reflections</artifactId> </dependency> - </dependencies> + </dependencies> <build> <plugins> @@ -710,6 +717,42 @@ under the License. </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <version>${avro.version}</version> + <executions> + <execution> + <id>generate-avro-test-sources</id> + <phase>generate-test-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory> + <testOutputDirectory>${project.basedir}/target/generated-test-sources/avro</testOutputDirectory> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-avro-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>${project.build.directory}/generated-test-sources/avro</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> diff --git a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java index 4d9c3dabfd3..db768bcc18f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java @@ -20,6 +20,8 @@ package org.apache.flink.test.completeness; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeInformationTestBase; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; import org.apache.flink.table.dataview.ListViewTypeInfo; import org.apache.flink.table.dataview.MapViewTypeInfo; import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo; @@ -78,7 +80,9 @@ public class TypeInfoTestCoverageTest extends TestLogger { SortedMapTypeInfo.class.getName(), ExternalTypeInfo.class.getName(), BigDecimalTypeInfo.class.getName(), - DecimalDataTypeInfo.class.getName()); + DecimalDataTypeInfo.class.getName(), + GenericRecordAvroTypeInfo.class.getName(), + AvroTypeInfo.class.getName()); // check if a test exists for each type information for (Class<? extends TypeInformation> typeInfo : typeInfos) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java index 20b01b4df57..f71b60a08a8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java @@ -65,6 +65,7 @@ import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge; import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNode; import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNodeSerializer; import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy; +import org.apache.flink.formats.avro.typeutils.AvroSerializer; import org.apache.flink.runtime.state.ArrayListSerializer; import org.apache.flink.runtime.state.heap.TestDuplicateSerializer; import org.apache.flink.runtime.state.ttl.TtlStateFactory; @@ -195,7 +196,8 @@ public class TypeSerializerTestCoverageTest extends TestLogger { RowDataSerializer.class.getName(), DecimalDataSerializer.class.getName(), SharedBufferNode.SharedBufferNodeSerializer.class.getName(), - NFA.NFASerializer.class.getName()); + NFA.NFASerializer.class.getName(), + AvroSerializer.class.getName()); // type serializer whitelist for TypeSerializerUpgradeTestBase test coverage final List<String> typeSerializerUpgradeTestBaseWhitelist = @@ -258,7 +260,8 @@ public class TypeSerializerTestCoverageTest extends TestLogger { RowDataSerializer.class.getName(), DecimalDataSerializer.class.getName(), SharedBufferNode.SharedBufferNodeSerializer.class.getName(), - NFA.NFASerializer.class.getName()); + NFA.NFASerializer.class.getName(), + AvroSerializer.class.getName()); // check if a test exists for each type serializer for (Class<? extends TypeSerializer> typeSerializer : typeSerializers) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java index 33668f6c5b2..206725c0154 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java @@ -21,42 +21,46 @@ import org.apache.flink.client.deployment.executors.RemoteExecutor; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; -import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.junit5.MiniClusterExtension; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.io.InputStream; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Integration tests for {@link StreamExecutionEnvironment}. */ public class StreamExecutionEnvironmentITCase { - // We use our own miniClusterResource because we wan't to connect to it using a remote executor. - @ClassRule - public static MiniClusterWithClientResource miniClusterResource = - new MiniClusterWithClientResource( + @RegisterExtension + public static final MiniClusterExtension MINI_CLUSTER = + new MiniClusterExtension( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(1) .build()); - @Rule public ExpectedException thrown = ExpectedException.none(); - @Test - public void executeThrowsProgramInvocationException() throws Exception { - UnmodifiableConfiguration clientConfiguration = - miniClusterResource.getClientConfiguration(); - Configuration config = new Configuration(clientConfiguration); + public void executeThrowsProgramInvocationException() { + Configuration config = new Configuration(MINI_CLUSTER.getClientConfiguration()); config.set(DeploymentOptions.TARGET, RemoteExecutor.NAME); config.setBoolean(DeploymentOptions.ATTACHED, true); // Create the execution environment explicitly from a Configuration so we know that we // don't get some other subclass. If we just did // StreamExecutionEnvironment.getExecutionEnvironment() we would get a - // TestStreamEnvironment that the MiniClusterResource created. We want to test the behaviour + // TestStreamEnvironment that the MiniClusterExtension created. We want to test the + // behaviour // of the base environment, though. StreamExecutionEnvironment env = new StreamExecutionEnvironment(config); @@ -67,7 +71,52 @@ public class StreamExecutionEnvironmentITCase { }) .print(); - thrown.expect(ProgramInvocationException.class); - env.execute(); + assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class); + } + + @Test + @SuppressWarnings("unchecked") + void testAvroGenericRecordsInFromElementsDoesNotFailDueToKryoFallback() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Schema schema = getSchemaFromResources("/avro/user.avsc"); + GenericRecord user1 = + new GenericRecordBuilder(schema).set("name", "Foo").set("age", 40).build(); + GenericRecord user2 = + new GenericRecordBuilder(schema).set("name", "Bar").set("age", 45).build(); + GenericRecord[] data = {user1, user2}; + DataStream<GenericRecord> stream = + env.fromElements(new GenericRecordAvroTypeInfo(schema), data); + + List<GenericRecord> result = stream.executeAndCollect(data.length + 1); + + assertThat(result).containsExactly(data); + } + + @Test + @SuppressWarnings("unchecked") + void testAvroGenericRecordsInFromElementsDoesNotFailDueToKryoFallbackUsingReturns() + throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Schema schema = getSchemaFromResources("/avro/user.avsc"); + GenericRecord user1 = + new GenericRecordBuilder(schema).set("name", "Foo").set("age", 40).build(); + GenericRecord user2 = + new GenericRecordBuilder(schema).set("name", "Bar").set("age", 45).build(); + GenericRecord[] data = {user1, user2}; + DataStream<GenericRecord> stream = + env.fromElements(data).returns(new GenericRecordAvroTypeInfo(schema)); + + List<GenericRecord> result = stream.executeAndCollect(data.length + 1); + + assertThat(result).containsExactly(data); + } + + private Schema getSchemaFromResources(String path) throws Exception { + try (InputStream schemaStream = getClass().getResourceAsStream(path)) { + if (schemaStream == null) { + throw new IllegalStateException("Could not find " + path + " in classpath"); + } + return new Schema.Parser().parse(schemaStream); + } } } diff --git a/flink-tests/src/test/resources/avro/user.avsc b/flink-tests/src/test/resources/avro/user.avsc new file mode 100644 index 00000000000..504a06b9340 --- /dev/null +++ b/flink-tests/src/test/resources/avro/user.avsc @@ -0,0 +1,9 @@ +{ + "namespace": "org.apache.flink.connector.datagen.source.generated", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"} + ] +} diff --git a/pom.xml b/pom.xml index ec709084fc9..a35bdb6a0b5 100644 --- a/pom.xml +++ b/pom.xml @@ -1622,6 +1622,8 @@ under the License. <exclude>flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/answer_set/*</exclude> <exclude>flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/query/*</exclude> <exclude>flink-table/flink-table-code-splitter/src/test/resources/**</exclude> + <exclude>flink-tests/src/test/resources/avro/*.avsc</exclude> + <exclude>flink-connectors/flink-connector-datagen-test/src/test/resources/avro/*.avsc</exclude> <!-- ArchUnit violation stores --> <exclude>**/archunit-violations/**</exclude> @@ -2294,6 +2296,7 @@ under the License. <exclude>@org.apache.flink.annotation.PublicEvolving</exclude> <exclude>@org.apache.flink.annotation.Internal</exclude> <!-- MARKER: start exclusions; these will be wiped by tools/releasing/update_japicmp_configuration.sh --> + <exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromElements(org.apache.flink.api.common.typeinfo.TypeInformation,java.lang.Object[])</exclude> <!-- MARKER: end exclusions --> </excludes> <accessModifier>public</accessModifier>