HeartSaVioR commented on code in PR #36963:
URL: https://github.com/apache/spark/pull/36963#discussion_r904526435
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -77,7 +78,11 @@ class IncrementalExecution(
*/
override
lazy val optimizedPlan: LogicalPlan =
executePhase(QueryPlanningTracker.OPTIMIZATION) {
- sparkSession.sessionState.optimizer.executeAndTrack(withCachedData,
+ // Performing pre-optimization for streaming specific
+ val preOptimized = withCachedData.transform {
Review Comment:
Self-comment: additional comment to explain what it does.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala:
##########
@@ -877,13 +879,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
extends Product with Tre
t.copy(properties = Utils.redact(t.properties).toMap,
options = Utils.redact(t.options).toMap) :: Nil
case table: CatalogTable =>
- table.storage.serde match {
- case Some(serde) => table.identifier :: serde :: Nil
- case _ => table.identifier :: Nil
- }
+ stringArgsForCatalogTable(table)
+
case other => other :: Nil
}.mkString(", ")
+ private def stringArgsForCatalogTable(table: CatalogTable): Seq[Any] = {
+ table.storage.serde match {
+ case Some(serde) => table.identifier :: serde :: Nil
Review Comment:
I see inconsistence on representing table identifier on the plan - while
this produces "quoted" string, I also see existing codebase to produce
"unquoted" string. e.g. LogicalRelation.
I'd like to hear the voice which is our preference. For now I used
"unquoted" string for other places and leave this as it is (hence "quoted"
string).
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##########
@@ -167,6 +173,17 @@ case class StreamingDataSourceV2Relation(
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}
+
+ private val stringArgsVal: Seq[Any] = {
+ val tableQualifier = (catalog, identifier) match {
Review Comment:
Printing out `catalog` and `identifier` separately does not seem to give a
good output, hence I changed to produce a single output for composite fields
`catalog` and `identifier` whenever they are provided altogether in the logical
node (only on streaming code path).
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala:
##########
@@ -868,6 +868,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
extends Product with Tre
case null => Nil
case None => Nil
case Some(null) => Nil
+ case Some(table: CatalogTable) =>
Review Comment:
This is needed to represent the CatalogTable properly if it is presented.
(Before the fix it calls CatalogTable.toString which produces a bunch of
information with multiple lines.)
I feel it could be something be generalized, but I don't know we have other
existing cases to handle.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -576,14 +589,22 @@ class MicroBatchExecution(
// Replace sources in the logical plan with data that has arrived since
the last batch.
val newBatchesPlan = logicalPlan transform {
// For v1 sources.
- case StreamingExecutionRelation(source, output) =>
+ case StreamingExecutionRelation(source, output, catalogTable) =>
newData.get(source).map { dataPlan =>
val hasFileMetadata = output.exists {
case FileSourceMetadataAttribute(_) => true
case _ => false
}
val finalDataPlan = dataPlan transformUp {
- case l: LogicalRelation if hasFileMetadata =>
l.withMetadataColumns()
+ case l: LogicalRelation =>
Review Comment:
Many DSv1 data sources produce LogicalRelation as leaf node, which contains
Relation having a field of catalogTable. We fill out the information of the
catalogTable in case when the source is not able to provide the information.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala:
##########
@@ -42,7 +48,11 @@ object StreamingRelation {
* It should be used to create [[Source]] and converted to
[[StreamingExecutionRelation]] when
* passing to [[StreamExecution]] to run a query.
*/
-case class StreamingRelation(dataSource: DataSource, sourceName: String,
output: Seq[Attribute])
+case class StreamingRelation(
+ dataSource: DataSource,
+ sourceName: String,
+ output: Seq[Attribute],
+ catalogTable: Option[CatalogTable])
Review Comment:
Self-comment: DataSource has a field `catalogTable`. Leverage it instead of
adding a new column.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSourceV1.scala:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.streaming.OutputMode
+
+case class WriteToMicroBatchDataSourceV1(
+ catalogTable: Option[CatalogTable],
+ sink: Sink,
+ query: LogicalPlan,
+ queryId: String,
+ writeOptions: Map[String, String],
+ outputMode: OutputMode,
+ batchId: Option[Long] = None)
+ extends UnaryNode {
+
+ override def child: LogicalPlan = query
+
+ // Despite this is logically the top node, this node should behave like
"pass-through"
+ // since the DSv1 codepath on microbatch execution handles sink operation
separately.
+ // We will eliminate this node in physical planning, which shouldn't make
difference as
Review Comment:
Self-review: Outdated. Now we eliminate this in optimization. Update the
comment.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -113,7 +114,9 @@ class MicroBatchExecution(
v1.get.asInstanceOf[StreamingRelation].dataSource.createSource(metadataPath)
nextSourceId += 1
logInfo(s"Using Source [$source] from DataSourceV2 named
'$srcName' $dsStr")
- StreamingExecutionRelation(source, output)(sparkSession)
+ // We don't have a catalog table but may have a table identifier.
Given this is about
Review Comment:
Actually this else statement is to handle edge case where the data source is
based on DSv2 and implements continuous read but does not implement microbatch
read.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -168,7 +171,17 @@ class MicroBatchExecution(
extraOptions,
outputMode)
- case _ => _logicalPlan
+ case s: Sink =>
+ WriteToMicroBatchDataSourceV1(
Review Comment:
It'd be nice if we can deal with SPARK-27484, but let's defer it as of now
as it may bring additional works/concerns.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -113,7 +114,9 @@ class MicroBatchExecution(
v1.get.asInstanceOf[StreamingRelation].dataSource.createSource(metadataPath)
nextSourceId += 1
logInfo(s"Using Source [$source] from DataSourceV2 named
'$srcName' $dsStr")
- StreamingExecutionRelation(source, output)(sparkSession)
+ // We don't have a catalog table but may have a table identifier.
Given this is about
Review Comment:
This is based on the fact we are encouraging 3rd party to use DSv2 for
streaming data source whenever possible. (We don't make any improvements on
DSv1 now.) When the data source implements DSv2, it is advised to implement the
functionality of microbatch read.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala:
##########
@@ -111,7 +122,11 @@ case class StreamingExecutionRelation(
* A dummy physical plan for [[StreamingRelation]] to support
* [[org.apache.spark.sql.Dataset.explain]]
*/
-case class StreamingRelationExec(sourceName: String, output: Seq[Attribute])
extends LeafExecNode {
+case class StreamingRelationExec(
+ sourceName: String,
+ output: Seq[Attribute],
+ tableIdentifier: Option[String]) extends LeafExecNode {
+ // FIXME: check the representation of this node and come up with good format
to show table name
Review Comment:
Self-review: remove it as it's already addressed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]