aokolnychyi commented on a change in pull request #35374:
URL: https://github.com/apache/spark/pull/35374#discussion_r796124265
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
##########
@@ -77,6 +81,36 @@ object V2Writes extends Rule[LogicalPlan] with
PredicateHelper {
}
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query,
conf)
o.copy(write = Some(write), query = newQuery)
+
+ case WriteToMicroBatchDataSource(
+ relation, table, query, queryId, writeOptions, outputMode,
Some(batchId)) =>
+
+ val writeBuilder = newWriteBuilder(table, query, writeOptions, queryId)
+ val write = buildWriteForMicroBatch(table, writeBuilder, outputMode)
+ val microBatchWrite = new MicroBatchWrite(batchId, write.toStreaming)
+ val customMetrics = write.supportedCustomMetrics.toSeq
+ val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query,
conf)
+ WriteToDataSourceV2(relation, microBatchWrite, newQuery, customMetrics)
+ }
+
+ private def buildWriteForMicroBatch(
+ table: SupportsWrite,
+ writeBuilder: WriteBuilder,
+ outputMode: OutputMode): Write = {
+
+ outputMode match {
+ case Append =>
+ writeBuilder.build()
+ case Complete =>
+ // TODO: we should do this check earlier when we have capability API.
Review comment:
I copied this code with the original TODO item.
I can create an issue for this but I think we should address it separately.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]