Github user tengpeng commented on a diff in the pull request:
https://github.com/apache/spark/pull/20933#discussion_r200819285
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -241,39 +240,47 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) {
val cls = DataSource.lookupDataSource(source,
df.sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val ds = cls.newInstance()
- ds match {
- case ws: WriteSupport =>
- val options = new DataSourceOptions((extraOptions ++
- DataSourceV2Utils.extractSessionConfigs(
- ds = ds.asInstanceOf[DataSourceV2],
- conf = df.sparkSession.sessionState.conf)).asJava)
- // Using a timestamp and a random UUID to distinguish different
writing jobs. This is good
- // enough as there won't be tons of writing jobs created at the
same second.
- val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
- .format(new Date()) + "-" + UUID.randomUUID()
- val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode,
options)
- if (writer.isPresent) {
- runCommand(df.sparkSession, "save") {
- WriteToDataSourceV2(writer.get(), df.logicalPlan)
- }
- }
+ val (needToFallBackFileDataSourceV2, fallBackFileFormat) = ds match {
+ case f: FileDataSourceV2 =>
+ val disabledV2Readers =
+
df.sparkSession.sessionState.conf.disabledV2FileDataSourceWriter.split(",")
+ (disabledV2Readers.contains(f.shortName),
f.fallBackFileFormat.getCanonicalName)
+ case _ => (false, source)
+ }
- // Streaming also uses the data source V2 API. So it may be that
the data source implements
- // v2, but has no v2 implementation for batch writes. In that
case, we fall back to saving
- // as though it's a V1 source.
- case _ => saveToV1Source()
+ if (ds.isInstanceOf[WriteSupport] &&
!needToFallBackFileDataSourceV2) {
+ val options = new DataSourceOptions((extraOptions ++
+ DataSourceV2Utils.extractSessionConfigs(
+ ds = ds.asInstanceOf[DataSourceV2],
+ conf = df.sparkSession.sessionState.conf)).asJava)
+ // Using a timestamp and a random UUID to distinguish different
writing jobs. This is good
+ // enough as there won't be tons of writing jobs created at the
same second.
+ val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+ .format(new Date()) + "-" + UUID.randomUUID()
+ val writer = ds.asInstanceOf[WriteSupport]
+ .createWriter(jobId, df.logicalPlan.schema, mode, options)
--- End diff --
I am not sure I understand this: why do use `.createWriter` here, but we do
not use `.createReader` in `DataFrameReader`. It seems "unsymmetrical" to me.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]