HeartSaVioR commented on code in PR #36963:
URL: https://github.com/apache/spark/pull/36963#discussion_r904526435


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -77,7 +78,11 @@ class IncrementalExecution(
    */
   override
   lazy val optimizedPlan: LogicalPlan = 
executePhase(QueryPlanningTracker.OPTIMIZATION) {
-    sparkSession.sessionState.optimizer.executeAndTrack(withCachedData,
+    // Performing pre-optimization for streaming specific
+    val preOptimized = withCachedData.transform {

Review Comment:
   Self-comment: additional comment to explain what it does.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala:
##########
@@ -877,13 +879,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product with Tre
       t.copy(properties = Utils.redact(t.properties).toMap,
         options = Utils.redact(t.options).toMap) :: Nil
     case table: CatalogTable =>
-      table.storage.serde match {
-        case Some(serde) => table.identifier :: serde :: Nil
-        case _ => table.identifier :: Nil
-      }
+      stringArgsForCatalogTable(table)
+
     case other => other :: Nil
   }.mkString(", ")
 
+  private def stringArgsForCatalogTable(table: CatalogTable): Seq[Any] = {
+    table.storage.serde match {
+      case Some(serde) => table.identifier :: serde :: Nil

Review Comment:
   I see inconsistence on representing table identifier on the plan - while 
this produces "quoted" string, I also see existing codebase to produce 
"unquoted" string. e.g. LogicalRelation.
   
   I'd like to hear the voice which is our preference. For now I used 
"unquoted" string for other places and leave this as it is (hence "quoted" 
string).



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala:
##########
@@ -167,6 +173,17 @@ case class StreamingDataSourceV2Relation(
     case _ =>
       Statistics(sizeInBytes = conf.defaultSizeInBytes)
   }
+
+  private val stringArgsVal: Seq[Any] = {
+    val tableQualifier = (catalog, identifier) match {

Review Comment:
   Printing out `catalog` and `identifier` separately does not seem to give a 
good output, hence I changed to produce a single output for composite fields 
`catalog` and `identifier` whenever they are provided altogether in the logical 
node (only on streaming code path).



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala:
##########
@@ -868,6 +868,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product with Tre
     case null => Nil
     case None => Nil
     case Some(null) => Nil
+    case Some(table: CatalogTable) =>

Review Comment:
   This is needed to represent the CatalogTable properly if it is presented. 
(Before the fix it calls CatalogTable.toString which produces a bunch of 
information with multiple lines.)
   
   I feel it could be something be generalized, but I don't know we have other 
existing cases to handle.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -576,14 +589,22 @@ class MicroBatchExecution(
     // Replace sources in the logical plan with data that has arrived since 
the last batch.
     val newBatchesPlan = logicalPlan transform {
       // For v1 sources.
-      case StreamingExecutionRelation(source, output) =>
+      case StreamingExecutionRelation(source, output, catalogTable) =>
         newData.get(source).map { dataPlan =>
           val hasFileMetadata = output.exists {
             case FileSourceMetadataAttribute(_) => true
             case _ => false
           }
           val finalDataPlan = dataPlan transformUp {
-            case l: LogicalRelation if hasFileMetadata => 
l.withMetadataColumns()
+            case l: LogicalRelation =>

Review Comment:
   Many DSv1 data sources produce LogicalRelation as leaf node, which contains 
Relation having a field of catalogTable. We fill out the information of the 
catalogTable in case when the source is not able to provide the information.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala:
##########
@@ -42,7 +48,11 @@ object StreamingRelation {
  * It should be used to create [[Source]] and converted to 
[[StreamingExecutionRelation]] when
  * passing to [[StreamExecution]] to run a query.
  */
-case class StreamingRelation(dataSource: DataSource, sourceName: String, 
output: Seq[Attribute])
+case class StreamingRelation(
+    dataSource: DataSource,
+    sourceName: String,
+    output: Seq[Attribute],
+    catalogTable: Option[CatalogTable])

Review Comment:
   Self-comment: DataSource has a field `catalogTable`. Leverage it instead of 
adding a new column.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSourceV1.scala:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.streaming.OutputMode
+
+case class WriteToMicroBatchDataSourceV1(
+    catalogTable: Option[CatalogTable],
+    sink: Sink,
+    query: LogicalPlan,
+    queryId: String,
+    writeOptions: Map[String, String],
+    outputMode: OutputMode,
+    batchId: Option[Long] = None)
+  extends UnaryNode {
+
+  override def child: LogicalPlan = query
+
+  // Despite this is logically the top node, this node should behave like 
"pass-through"
+  // since the DSv1 codepath on microbatch execution handles sink operation 
separately.
+  // We will eliminate this node in physical planning, which shouldn't make 
difference as

Review Comment:
   Self-review: Outdated. Now we eliminate this in optimization. Update the 
comment.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -113,7 +114,9 @@ class MicroBatchExecution(
               
v1.get.asInstanceOf[StreamingRelation].dataSource.createSource(metadataPath)
             nextSourceId += 1
             logInfo(s"Using Source [$source] from DataSourceV2 named 
'$srcName' $dsStr")
-            StreamingExecutionRelation(source, output)(sparkSession)
+            // We don't have a catalog table but may have a table identifier. 
Given this is about

Review Comment:
   Actually this else statement is to handle edge case where the data source is 
based on DSv2 and implements continuous read but does not implement microbatch 
read.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -168,7 +171,17 @@ class MicroBatchExecution(
           extraOptions,
           outputMode)
 
-      case _ => _logicalPlan
+      case s: Sink =>
+        WriteToMicroBatchDataSourceV1(

Review Comment:
   It'd be nice if we can deal with SPARK-27484, but let's defer it as of now 
as it may bring additional works/concerns.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -113,7 +114,9 @@ class MicroBatchExecution(
               
v1.get.asInstanceOf[StreamingRelation].dataSource.createSource(metadataPath)
             nextSourceId += 1
             logInfo(s"Using Source [$source] from DataSourceV2 named 
'$srcName' $dsStr")
-            StreamingExecutionRelation(source, output)(sparkSession)
+            // We don't have a catalog table but may have a table identifier. 
Given this is about

Review Comment:
   This is based on the fact we are encouraging 3rd party to use DSv2 for 
streaming data source whenever possible. (We don't make any improvements on 
DSv1 now.) When the data source implements DSv2, it is advised to implement the 
functionality of microbatch read.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala:
##########
@@ -111,7 +122,11 @@ case class StreamingExecutionRelation(
  * A dummy physical plan for [[StreamingRelation]] to support
  * [[org.apache.spark.sql.Dataset.explain]]
  */
-case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) 
extends LeafExecNode {
+case class StreamingRelationExec(
+    sourceName: String,
+    output: Seq[Attribute],
+    tableIdentifier: Option[String]) extends LeafExecNode {
+  // FIXME: check the representation of this node and come up with good format 
to show table name

Review Comment:
   Self-review: remove it as it's already addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to