cloud-fan commented on code in PR #48822:
URL: https://github.com/apache/spark/pull/48822#discussion_r1849844676
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala:
##########
@@ -87,31 +92,44 @@ object V2Writes extends Rule[LogicalPlan] with
PredicateHelper {
o.copy(write = Some(write), query = newQuery)
case WriteToMicroBatchDataSource(
- relation, table, query, queryId, writeOptions, outputMode,
Some(batchId)) =>
-
+ relationOpt, table, query, queryId, options, outputMode,
Some(batchId)) =>
+ val writeOptions = mergeOptions(
+ options, relationOpt.map(r =>
r.options.asScala.toMap).getOrElse(Map.empty))
val writeBuilder = newWriteBuilder(table, writeOptions, query.schema,
queryId)
val write = buildWriteForMicroBatch(table, writeBuilder, outputMode)
val microBatchWrite = new MicroBatchWrite(batchId, write.toStreaming)
val customMetrics = write.supportedCustomMetrics.toImmutableArraySeq
- val funCatalogOpt = relation.flatMap(_.funCatalog)
+ val funCatalogOpt = relationOpt.flatMap(_.funCatalog)
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query,
funCatalogOpt)
- WriteToDataSourceV2(relation, microBatchWrite, newQuery, customMetrics)
+ WriteToDataSourceV2(relationOpt, microBatchWrite, newQuery,
customMetrics)
case rd @ ReplaceData(r: DataSourceV2Relation, _, query, _, _, None) =>
val rowSchema = DataTypeUtils.fromAttributes(rd.dataInput)
- val writeBuilder = newWriteBuilder(r.table, Map.empty, rowSchema)
+ val writeOptions = mergeOptions(Map.empty, r.options.asScala.toMap)
+ val writeBuilder = newWriteBuilder(r.table, writeOptions, rowSchema)
val write = writeBuilder.build()
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query,
r.funCatalog)
// project away any metadata columns that could be used for distribution
and ordering
rd.copy(write = Some(write), query = Project(rd.dataInput, newQuery))
case wd @ WriteDelta(r: DataSourceV2Relation, _, query, _, projections,
None) =>
- val deltaWriteBuilder = newDeltaWriteBuilder(r.table, Map.empty,
projections)
+ val writeOptions = mergeOptions(Map.empty, r.options.asScala.toMap)
+ val deltaWriteBuilder = newDeltaWriteBuilder(r.table, writeOptions,
projections)
val deltaWrite = deltaWriteBuilder.build()
val newQuery = DistributionAndOrderingUtils.prepareQuery(deltaWrite,
query, r.funCatalog)
wd.copy(write = Some(deltaWrite), query = newQuery)
}
+ private def mergeOptions(
+ commandOptions: Map[String, String],
+ dsOptions: Map[String, String]): Map[String, String] = {
+ // for DataFrame API cases, same options are carried by both Command and
DataSourceV2Relation
+ // for DataFrameV2 API cases, options are only carried by Command
+ // for SQL cases, options are only carried by DataSourceV2Relation
Review Comment:
we will take care of `SessionConfigSupport` in followup PRs?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]