keypointt commented on a change in pull request #25465: [SPARK-28747][SQL]
merge the two data source v2 fallback configs
URL: https://github.com/apache/spark/pull/25465#discussion_r315932660
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
##########
@@ -251,64 +251,62 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) {
assertNotBucketed("save")
- val session = df.sparkSession
- val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
- val canUseV2 = canUseV2Source(session, cls) && partitioningColumns.isEmpty
-
- // In Data Source V2 project, partitioning is still under development.
- // Here we fallback to V1 if partitioning columns are specified.
- // TODO(SPARK-26778): use V2 implementations when partitioning feature is
supported.
- if (canUseV2) {
- val provider =
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
- val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
- provider, session.sessionState.conf)
- val options = sessionOptions ++ extraOptions
- val dsOptions = new CaseInsensitiveStringMap(options.asJava)
-
- import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
- provider.getTable(dsOptions) match {
- // TODO (SPARK-27815): To not break existing tests, here we treat file
source as a special
- // case, and pass the save mode to file source directly. This hack
should be removed.
- case table: FileTable =>
- val write =
table.newWriteBuilder(dsOptions).asInstanceOf[FileWriteBuilder]
- .mode(modeForDSV1) // should not change default mode for file
source.
- .withQueryId(UUID.randomUUID().toString)
- .withInputDataSchema(df.logicalPlan.schema)
- .buildForBatch()
- // The returned `Write` can be null, which indicates that we can
skip writing.
- if (write != null) {
+ lookupV2Provider() match {
+ // TODO(SPARK-26778): use V2 implementations when partition columns are
specified
+ case Some(provider) if partitioningColumns.isEmpty =>
+ saveToV2Source(provider)
+
+ case _ => saveToV1Source()
+ }
+ }
+
+ private def saveToV2Source(provider: TableProvider): Unit = {
+ val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+ provider, df.sparkSession.sessionState.conf)
+ val options = sessionOptions ++ extraOptions
+ val dsOptions = new CaseInsensitiveStringMap(options.asJava)
+
+ import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+ provider.getTable(dsOptions) match {
+ // TODO (SPARK-27815): To not break existing tests, here we treat file
source as a special
+ // case, and pass the save mode to file source directly. This hack
should be removed.
+ case table: FileTable =>
+ val write =
table.newWriteBuilder(dsOptions).asInstanceOf[FileWriteBuilder]
+ .mode(modeForDSV1) // should not change default mode for file source.
+ .withQueryId(UUID.randomUUID().toString)
+ .withInputDataSchema(df.logicalPlan.schema)
+ .buildForBatch()
+ // The returned `Write` can be null, which indicates that we can skip
writing.
+ if (write != null) {
+ runCommand(df.sparkSession, "save") {
+ WriteToDataSourceV2(write, df.logicalPlan)
+ }
+ }
+
+ case table: SupportsWrite if table.supports(BATCH_WRITE) =>
+ lazy val relation = DataSourceV2Relation.create(table, dsOptions)
+ modeForDSV2 match {
+ case SaveMode.Append =>
runCommand(df.sparkSession, "save") {
- WriteToDataSourceV2(write, df.logicalPlan)
+ AppendData.byName(relation, df.logicalPlan)
}
- }
- case table: SupportsWrite if table.supports(BATCH_WRITE) =>
- lazy val relation = DataSourceV2Relation.create(table, dsOptions)
- modeForDSV2 match {
- case SaveMode.Append =>
- runCommand(df.sparkSession, "save") {
- AppendData.byName(relation, df.logicalPlan)
- }
-
- case SaveMode.Overwrite if table.supportsAny(TRUNCATE,
OVERWRITE_BY_FILTER) =>
- // truncate the table
- runCommand(df.sparkSession, "save") {
- OverwriteByExpression.byName(relation, df.logicalPlan,
Literal(true))
- }
-
- case other =>
- throw new AnalysisException(s"TableProvider implementation
$source cannot be " +
- s"written with $other mode, please use Append or Overwrite " +
- "modes instead.")
- }
+ case SaveMode.Overwrite if table.supportsAny(TRUNCATE,
OVERWRITE_BY_FILTER) =>
+ // truncate the table
+ runCommand(df.sparkSession, "save") {
+ OverwriteByExpression.byName(relation, df.logicalPlan,
Literal(true))
+ }
- // Streaming also uses the data source V2 API. So it may be that the
data source implements
- // v2, but has no v2 implementation for batch writes. In that case, we
fall back to saving
- // as though it's a V1 source.
- case _ => saveToV1Source()
- }
- } else {
- saveToV1Source()
+ case other =>
+ throw new AnalysisException(s"TableProvider implementation $source
cannot be " +
+ s"written with $other mode, please use Append or Overwrite " +
Review comment:
```use `Append` or `Overwrite` save mode instead``` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]