HeartSaVioR commented on a change in pull request #35374:
URL: https://github.com/apache/spark/pull/35374#discussion_r799106086



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
##########
@@ -712,15 +831,84 @@ class WriteDistributionAndOrderingSuite
     }
   }
 
+  private def checkMicroBatchWriteRequirements(
+      tableDistribution: Distribution,
+      tableOrdering: Array[SortOrder],
+      tableNumPartitions: Option[Int],
+      expectedWritePartitioning: physical.Partitioning,
+      expectedWriteOrdering: Seq[catalyst.expressions.SortOrder],
+      writeTransform: DataFrame => DataFrame = df => df,
+      outputMode: String = "append",
+      expectAnalysisException: Boolean = false): Unit = {
+
+    catalog.createTable(ident, schema, Array.empty, emptyProps, 
tableDistribution,
+      tableOrdering, tableNumPartitions)
+
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[(Long, String)]
+      val inputDF = inputData.toDF().toDF("id", "data")
+
+      val queryDF = outputMode match {
+        case "append" =>
+          inputDF
+        case "complete" =>

Review comment:
       Let's address "update" mode as well; it's ill-defined but there are 
users tolerating the definition due to the latency requirement.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
##########
@@ -77,6 +81,36 @@ object V2Writes extends Rule[LogicalPlan] with 
PredicateHelper {
       }
       val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, 
conf)
       o.copy(write = Some(write), query = newQuery)
+
+    case WriteToMicroBatchDataSource(
+        relation, table, query, queryId, writeOptions, outputMode, 
Some(batchId)) =>
+
+      val writeBuilder = newWriteBuilder(table, query, writeOptions, queryId)
+      val write = buildWriteForMicroBatch(table, writeBuilder, outputMode)
+      val microBatchWrite = new MicroBatchWrite(batchId, write.toStreaming)
+      val customMetrics = write.supportedCustomMetrics.toSeq
+      val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, 
conf)
+      WriteToDataSourceV2(relation, microBatchWrite, newQuery, customMetrics)
+  }
+
+  private def buildWriteForMicroBatch(
+      table: SupportsWrite,
+      writeBuilder: WriteBuilder,
+      outputMode: OutputMode): Write = {
+
+    outputMode match {
+      case Append =>
+        writeBuilder.build()
+      case Complete =>
+        // TODO: we should do this check earlier when we have capability API.

Review comment:
       I agree it's beyond this PR.

##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java
##########
@@ -35,6 +35,11 @@
    * Spark will distribute incoming records across partitions to satisfy the 
required distribution
    * before passing the records to the data source table on write.
    * <p>
+   * Batch and micro-batch writes can request a particular data distribution.

Review comment:
       > We can add some validation and throw an exception for continuous 
execution (can be a separate PR?).
   
   +1
   If it's easy to do then I'd prefer doing this in this PR. We tend to have a 
long time gap between PRs, even across releases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to