rdblue commented on a change in pull request #23208: [SPARK-25530][SQL] data
source v2 API refactor (batch write)
URL: https://github.com/apache/spark/pull/23208#discussion_r247685011
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
##########
@@ -241,33 +242,38 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) {
assertNotBucketed("save")
- val cls = DataSource.lookupDataSource(source,
df.sparkSession.sessionState.conf)
- if (classOf[DataSourceV2].isAssignableFrom(cls)) {
- val source =
cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
- source match {
- case provider: BatchWriteSupportProvider =>
- val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
- source,
- df.sparkSession.sessionState.conf)
- val options = sessionOptions ++ extraOptions
-
+ val session = df.sparkSession
+ val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
+ if (classOf[TableProvider].isAssignableFrom(cls)) {
+ val provider =
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
+ val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+ provider, session.sessionState.conf)
+ val options = sessionOptions ++ extraOptions
+ val dsOptions = new DataSourceOptions(options.asJava)
+ provider.getTable(dsOptions) match {
+ case table: SupportsBatchWrite =>
if (mode == SaveMode.Append) {
- val relation = DataSourceV2Relation.createRelationForWrite(source,
options)
+ val relation = DataSourceV2Relation.create(table, options)
runCommand(df.sparkSession, "save") {
AppendData.byName(relation, df.logicalPlan)
}
-
} else {
- val writer = provider.createBatchWriteSupport(
- UUID.randomUUID().toString,
- df.logicalPlan.output.toStructType,
- mode,
- new DataSourceOptions(options.asJava))
-
- if (writer.isPresent) {
- runCommand(df.sparkSession, "save") {
- WriteToDataSourceV2(writer.get, df.logicalPlan)
- }
+ val writeBuilder = table.newWriteBuilder(dsOptions)
+ .withQueryId(UUID.randomUUID().toString)
+ .withInputDataSchema(df.logicalPlan.schema)
+ writeBuilder match {
+ case s: SupportsSaveMode =>
+ val write = s.mode(mode).buildForBatch()
+ // It can only return null with `SupportsSaveMode`. We can
clean it up after
+ // removing `SupportsSaveMode`.
+ if (write != null) {
+ runCommand(df.sparkSession, "save") {
+ WriteToDataSourceV2(write, df.logicalPlan)
+ }
+ }
+
+ case _ => throw new AnalysisException(
+ s"data source ${table.name} does not support SaveMode")
Review comment:
The error should be that the source doesn't support the mode, not that it
doesn't support SaveMode generally.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]