This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 02533d71806e [SPARK-46777][SS] Refactor `StreamingDataSourceV2Relation` catalyst structure to be more on-par with the batch version 02533d71806e is described below commit 02533d71806ec0be97ec793d680189093c9a0ecb Author: jackierwzhang <ruowang.zh...@databricks.com> AuthorDate: Mon Jan 22 18:58:55 2024 +0900 [SPARK-46777][SS] Refactor `StreamingDataSourceV2Relation` catalyst structure to be more on-par with the batch version ### What changes were proposed in this pull request? This PR refactors `StreamingDataSourceV2Relation` into `StreamingDataSourceV2Relation` and `StreamingDataSourceV2ScanRelation` to achieve better parity with the batch version. This prepares the codebase to be able to extend certain V2 optimization rules (e.g. `V2ScanRelationPushDown`) to be applied to streaming in the future. ### Why are the changes needed? As described above, we would like to start reuse certain V2 batch optimization rules to apply to streaming relations. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This is a pure refactoring, existing tests should be sufficient. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44818 from jackierwzhang/spark-46777. Authored-by: jackierwzhang <ruowang.zh...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 7 +- .../catalyst/streaming/StreamingRelationV2.scala | 4 +- .../datasources/v2/DataSourceV2Relation.scala | 83 ++++++++++++++++------ .../datasources/v2/DataSourceV2Strategy.scala | 4 +- .../execution/streaming/MicroBatchExecution.scala | 12 ++-- .../sql/execution/streaming/ProgressReporter.scala | 4 +- .../streaming/continuous/ContinuousExecution.scala | 14 ++-- .../sources/RateStreamProviderSuite.scala | 4 +- .../streaming/sources/TextSocketStreamSuite.scala | 4 +- .../apache/spark/sql/streaming/StreamSuite.scala | 8 +-- .../apache/spark/sql/streaming/StreamTest.scala | 4 +- .../sql/streaming/StreamingQueryManagerSuite.scala | 4 +- .../streaming/test/DataStreamTableAPISuite.scala | 2 +- 13 files changed, 99 insertions(+), 55 deletions(-) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index cee0d9a3dd72..fb5e71a1e7b8 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -40,7 +40,7 @@ import org.apache.spark.TestUtils import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.read.streaming.SparkDataStream -import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED} @@ -125,7 +125,8 @@ abstract class KafkaSourceTest extends StreamTest with SharedSparkSession with K val sources: Seq[SparkDataStream] = { query.get.logicalPlan.collect { case StreamingExecutionRelation(source: KafkaSource, _, _) => source - case r: StreamingDataSourceV2Relation if r.stream.isInstanceOf[KafkaMicroBatchStream] || + case r: StreamingDataSourceV2ScanRelation + if r.stream.isInstanceOf[KafkaMicroBatchStream] || r.stream.isInstanceOf[KafkaContinuousStream] => r.stream } @@ -1654,7 +1655,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { makeSureGetOffsetCalled, AssertOnQuery { query => query.logicalPlan.exists { - case r: StreamingDataSourceV2Relation => r.stream.isInstanceOf[KafkaMicroBatchStream] + case r: StreamingDataSourceV2ScanRelation => r.stream.isInstanceOf[KafkaMicroBatchStream] case _ => false } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala index ab0352b606e5..c1d7daa6cfcf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.streaming import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, SupportsMetadataColumns, Table, TableProvider} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits @@ -36,7 +36,7 @@ case class StreamingRelationV2( sourceName: String, table: Table, extraOptions: CaseInsensitiveStringMap, - output: Seq[Attribute], + output: Seq[AttributeReference], catalog: Option[CatalogPlugin], identifier: Option[Identifier], v1Relation: Option[LogicalPlan]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 9c7d776edc65..556283243f63 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -32,21 +32,21 @@ import org.apache.spark.util.Utils /** * A logical plan representing a data source v2 table. * - * @param table The table that this relation represents. - * @param output the output attributes of this relation. + * @param table The table that this relation represents. + * @param output The output attributes of this relation. * @param catalog catalogPlugin for the table. None if no catalog is specified. - * @param identifier the identifier for the table. None if no identifier is defined. + * @param identifier The identifier for the table. None if no identifier is defined. * @param options The options for this table operation. It's used to create fresh * [[org.apache.spark.sql.connector.read.ScanBuilder]] and * [[org.apache.spark.sql.connector.write.WriteBuilder]]. */ -case class DataSourceV2Relation( +abstract class DataSourceV2RelationBase( table: Table, output: Seq[AttributeReference], catalog: Option[CatalogPlugin], identifier: Option[Identifier], options: CaseInsensitiveStringMap) - extends LeafNode with MultiInstanceRelation with NamedRelation with ExposesMetadataColumns { + extends LeafNode with MultiInstanceRelation with NamedRelation { import DataSourceV2Implicits._ @@ -54,14 +54,6 @@ case class DataSourceV2Relation( case c: FunctionCatalog => c } - override lazy val metadataOutput: Seq[AttributeReference] = table match { - case hasMeta: SupportsMetadataColumns => - metadataOutputWithOutConflicts( - hasMeta.metadataColumns.toAttributes, hasMeta.canRenameConflictingMetadataColumns) - case _ => - Nil - } - override def name: String = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ (catalog, identifier) match { @@ -98,11 +90,34 @@ case class DataSourceV2Relation( } } } +} + +/** + * A specialization of [[DataSourceV2RelationBase]] that supports batch scan. + */ +case class DataSourceV2Relation( + table: Table, + override val output: Seq[AttributeReference], + catalog: Option[CatalogPlugin], + identifier: Option[Identifier], + options: CaseInsensitiveStringMap) + extends DataSourceV2RelationBase(table, output, catalog, identifier, options) + with ExposesMetadataColumns { + + import DataSourceV2Implicits._ override def newInstance(): DataSourceV2Relation = { copy(output = output.map(_.newInstance())) } + override lazy val metadataOutput: Seq[AttributeReference] = table match { + case hasMeta: SupportsMetadataColumns => + metadataOutputWithOutConflicts( + hasMeta.metadataColumns.toAttributes, hasMeta.canRenameConflictingMetadataColumns) + case _ => + Nil + } + def withMetadataColumns(): DataSourceV2Relation = { val newMetadata = metadataOutput.filterNot(outputSet.contains) if (newMetadata.nonEmpty) { @@ -152,21 +167,45 @@ case class DataSourceV2ScanRelation( } /** - * A specialization of [[DataSourceV2Relation]] with the streaming bit set to true. - * - * Note that, this plan has a mutable reader, so Spark won't apply operator push-down for this plan, - * to avoid making the plan mutable. We should consolidate this plan and [[DataSourceV2Relation]] - * after we figure out how to apply operator push-down for streaming data sources. + * A specialization of [[DataSourceV2RelationBase]] that supports streaming scan. + * It will be transformed to [[StreamingDataSourceV2ScanRelation]] during the planning phase of + * [[MicrobatchExecution]]. */ case class StreamingDataSourceV2Relation( - output: Seq[Attribute], - scan: Scan, - stream: SparkDataStream, + table: Table, + override val output: Seq[AttributeReference], catalog: Option[CatalogPlugin], identifier: Option[Identifier], + options: CaseInsensitiveStringMap, + metadataPath: String) + extends DataSourceV2RelationBase(table, output, catalog, identifier, options) { + + override def isStreaming: Boolean = true + + override def newInstance(): StreamingDataSourceV2Relation = { + copy(output = output.map(_.newInstance())) + } +} +/** + * A specialization of [[DataSourceV2ScanRelation]] with the streaming bit set to true, as well + * as start and end offsets for Microbatch processing. + */ +case class StreamingDataSourceV2ScanRelation( + relation: StreamingDataSourceV2Relation, + scan: Scan, + output: Seq[AttributeReference], + stream: SparkDataStream, startOffset: Option[Offset] = None, endOffset: Option[Offset] = None) - extends LeafNode with MultiInstanceRelation { + extends LeafNode with MultiInstanceRelation with NamedRelation { + + val (catalog, identifier) = (relation.catalog, relation.identifier) + + override def name: String = relation.table.name() + + override def simpleString(maxFields: Int): String = { + s"StreamingDataSourceV2ScanRelation${truncatedString(output, "[", ", ", "]", maxFields)} $name" + } override def isStreaming: Boolean = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index fe3140c8030a..3cf311017e5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -147,7 +147,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat StoragePartitionJoinParams(relation.keyGroupedPartitioning)) withProjectAndFilter(project, postScanFilters, batchExec, !batchExec.supportsColumnar) :: Nil - case PhysicalOperation(p, f, r: StreamingDataSourceV2Relation) + case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation) if r.startOffset.isDefined && r.endOffset.isDefined => val microBatchStream = r.stream.asInstanceOf[MicroBatchStream] @@ -157,7 +157,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat // Add a Project here to make sure we produce unsafe rows. withProjectAndFilter(p, f, scanExec, !scanExec.supportsColumnar) :: Nil - case PhysicalOperation(p, f, r: StreamingDataSourceV2Relation) + case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation) if r.startOffset.isDefined && r.endOffset.isEmpty => val continuousStream = r.stream.asInstanceOf[ContinuousStream] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 1bd59e818be5..8c98ad5c47dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset = import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.sources.{WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.Trigger @@ -103,7 +103,7 @@ class MicroBatchExecution( var nextSourceId = 0L val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]() val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2, StreamingExecutionRelation]() - val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]() + val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2ScanRelation]() // We transform each distinct streaming relation into a StreamingExecutionRelation, keeping a // map as we go to ensure each identical relation gets the same StreamingExecutionRelation // object. For each microbatch, the StreamingExecutionRelation will be replaced with a logical @@ -140,7 +140,9 @@ class MicroBatchExecution( // TODO: operator pushdown. val scan = table.newScanBuilder(options).build() val stream = scan.toMicroBatchStream(metadataPath) - StreamingDataSourceV2Relation(output, scan, stream, catalog, identifier) + val relation = StreamingDataSourceV2Relation( + table, output, catalog, identifier, options, metadataPath) + StreamingDataSourceV2ScanRelation(relation, scan, output, stream) }) } else if (v1.isEmpty) { throw QueryExecutionErrors.microBatchUnsupportedByDataSourceError( @@ -163,7 +165,7 @@ class MicroBatchExecution( // v1 source case s: StreamingExecutionRelation => s.source // v2 source - case r: StreamingDataSourceV2Relation => r.stream + case r: StreamingDataSourceV2ScanRelation => r.stream } // Initializing TriggerExecutor relies on `sources`, hence calling this after initializing @@ -706,7 +708,7 @@ class MicroBatchExecution( } // For v2 sources. - case r: StreamingDataSourceV2Relation => + case r: StreamingDataSourceV2ScanRelation => mutableNewData.get(r.stream).map { case OffsetHolder(start, end) => r.copy(startOffset = Some(start), endOffset = Some(end)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index c01f156e3d70..ccbbf9a4d874 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, ReportsSinkMetrics, ReportsSourceMetrics, SparkDataStream} import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2Relation, StreamWriterCommitProgress} +import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress} import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryProgressEvent} import org.apache.spark.util.Clock @@ -329,7 +329,7 @@ trait ProgressReporter extends Logging { val onlyDataSourceV2Sources = { // Check whether the streaming query's logical plan has only V2 micro-batch data sources val allStreamingLeaves = logicalPlan.collect { - case s: StreamingDataSourceV2Relation => s.stream.isInstanceOf[MicroBatchStream] + case s: StreamingDataSourceV2ScanRelation => s.stream.isInstanceOf[MicroBatchStream] case _: StreamingExecutionRelation => false } allStreamingLeaves.forall(_ == true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index a2d9a6705f97..1de05931faf5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, Partitio import org.apache.spark.sql.connector.write.{RequiresDistributionAndOrdering, Write} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.streaming.Trigger import org.apache.spark.util.ArrayImplicits._ @@ -61,7 +61,7 @@ class ContinuousExecution( private val failure: AtomicReference[Throwable] = new AtomicReference[Throwable](null) override val logicalPlan: WriteToContinuousDataSource = { - val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]() + val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2ScanRelation]() var nextSourceId = 0 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ val _logicalPlan = analyzedPlan.transform { @@ -79,12 +79,14 @@ class ContinuousExecution( // TODO: operator pushdown. val scan = table.newScanBuilder(options).build() val stream = scan.toContinuousStream(metadataPath) - StreamingDataSourceV2Relation(output, scan, stream, catalog, identifier) + val relation = StreamingDataSourceV2Relation( + table, output, catalog, identifier, options, metadataPath) + StreamingDataSourceV2ScanRelation(relation, scan, output, stream) }) } sources = _logicalPlan.collect { - case r: StreamingDataSourceV2Relation => r.stream.asInstanceOf[ContinuousStream] + case r: StreamingDataSourceV2ScanRelation => r.stream.asInstanceOf[ContinuousStream] } uniqueSources = sources.distinct.map(s => s -> ReadLimit.allAvailable()).toMap @@ -197,7 +199,7 @@ class ContinuousExecution( } val withNewSources: LogicalPlan = logicalPlan transform { - case relation: StreamingDataSourceV2Relation => + case relation: StreamingDataSourceV2ScanRelation => val loggedOffset = offsets.offsets(0) val realOffset = loggedOffset.map(off => relation.stream.deserializeOffset(off.json)) val startOffset = realOffset.getOrElse(relation.stream.initialOffset) @@ -227,7 +229,7 @@ class ContinuousExecution( } val stream = withNewSources.collect { - case relation: StreamingDataSourceV2Relation => + case relation: StreamingDataSourceV2ScanRelation => relation.stream.asInstanceOf[ContinuousStream] }.head diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala index 1f5f3d76d104..69dc8c291c0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream} import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.functions._ @@ -43,7 +43,7 @@ class RateStreamProviderSuite extends StreamTest { override def addData(query: Option[StreamExecution]): (SparkDataStream, Offset) = { assert(query.nonEmpty) val rateSource = query.get.logicalPlan.collect { - case r: StreamingDataSourceV2Relation + case r: StreamingDataSourceV2ScanRelation if r.stream.isInstanceOf[RateStreamMicroBatchStream] => r.stream.asInstanceOf[RateStreamMicroBatchStream] }.head diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala index 15aa0a80c207..bfeca5851102 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream} import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest} @@ -59,7 +59,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSparkSession { "Cannot add data when there is no query for finding the active socket source") val sources = query.get.logicalPlan.collect { - case r: StreamingDataSourceV2Relation + case r: StreamingDataSourceV2ScanRelation if r.stream.isInstanceOf[TextSocketMicroBatchStream] => r.stream.asInstanceOf[TextSocketMicroBatchStream] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index b0e54737d104..1b0b53357e5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -511,7 +511,7 @@ class StreamSuite extends StreamTest { val explainWithoutExtended = q.explainInternal(false) // `extended = false` only displays the physical plan. - assert("StreamingDataSourceV2Relation".r + assert("StreamingDataSourceV2ScanRelation".r .findAllMatchIn(explainWithoutExtended).size === 0) assert("BatchScan".r .findAllMatchIn(explainWithoutExtended).size === 1) @@ -521,7 +521,7 @@ class StreamSuite extends StreamTest { val explainWithExtended = q.explainInternal(true) // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical // plan. - assert("StreamingDataSourceV2Relation".r + assert("StreamingDataSourceV2ScanRelation".r .findAllMatchIn(explainWithExtended).size === 3) assert("BatchScan".r .findAllMatchIn(explainWithExtended).size === 1) @@ -566,7 +566,7 @@ class StreamSuite extends StreamTest { val explainWithoutExtended = q.explainInternal(false) // `extended = false` only displays the physical plan. - assert("StreamingDataSourceV2Relation".r + assert("StreamingDataSourceV2ScanRelation".r .findAllMatchIn(explainWithoutExtended).size === 0) assert("ContinuousScan".r .findAllMatchIn(explainWithoutExtended).size === 1) @@ -574,7 +574,7 @@ class StreamSuite extends StreamTest { val explainWithExtended = q.explainInternal(true) // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical // plan. - assert("StreamingDataSourceV2Relation".r + assert("StreamingDataSourceV2ScanRelation".r .findAllMatchIn(explainWithExtended).size === 3) assert("ContinuousScan".r .findAllMatchIn(explainWithExtended).size === 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 7ee66d18d481..f4816b04bbb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.physical.AllTuples import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, SparkDataStream} -import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, EpochCoordinatorRef, IncrementAndGetEpoch} import org.apache.spark.sql.execution.streaming.sources.MemorySink @@ -702,7 +702,7 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with // v1 source case r: StreamingExecutionRelation => r.source // v2 source - case r: StreamingDataSourceV2Relation => r.stream + case r: StreamingDataSourceV2ScanRelation => r.stream // We can add data to memory stream before starting it. Then the input plan has // not been processed by the streaming engine and contains `StreamingRelationV2`. case r: StreamingRelationV2 if r.sourceName == "memory" => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index 754f55202254..53cbbe6e786f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -30,7 +30,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException import org.apache.spark.sql.{Dataset, Encoders} -import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.BlockingSource @@ -467,7 +467,7 @@ class StreamingQueryManagerSuite extends StreamTest { if (withError) { logDebug(s"Terminating query ${queryToStop.name} with error") queryToStop.asInstanceOf[StreamingQueryWrapper].streamingQuery.logicalPlan.collect { - case r: StreamingDataSourceV2Relation => + case r: StreamingDataSourceV2ScanRelation => r.stream.asInstanceOf[MemoryStream[Int]].addData(0) } } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index 5a4f386f1d1d..eecc9468649d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -433,7 +433,7 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { val explainWithExtended = sq.explainInternal(true) // `extended = true` displays 3 logical plans (Parsed/Analyzed/Optimized) and 1 physical // plan. - assert("StreamingDataSourceV2Relation".r + assert("StreamingDataSourceV2ScanRelation".r .findAllMatchIn(explainWithExtended).size === 3) // WriteToMicroBatchDataSource is used for both parsed and analyzed logical plan assert("WriteToMicroBatchDataSource".r --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org