Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20933#discussion_r201427780
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -241,39 +240,47 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
         val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
         if (classOf[DataSourceV2].isAssignableFrom(cls)) {
           val ds = cls.newInstance()
    -      ds match {
    -        case ws: WriteSupport =>
    -          val options = new DataSourceOptions((extraOptions ++
    -            DataSourceV2Utils.extractSessionConfigs(
    -              ds = ds.asInstanceOf[DataSourceV2],
    -              conf = df.sparkSession.sessionState.conf)).asJava)
    -          // Using a timestamp and a random UUID to distinguish different 
writing jobs. This is good
    -          // enough as there won't be tons of writing jobs created at the 
same second.
    -          val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
    -            .format(new Date()) + "-" + UUID.randomUUID()
    -          val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, 
options)
    -          if (writer.isPresent) {
    -            runCommand(df.sparkSession, "save") {
    -              WriteToDataSourceV2(writer.get(), df.logicalPlan)
    -            }
    -          }
    +      val (needToFallBackFileDataSourceV2, fallBackFileFormat) = ds match {
    +        case f: FileDataSourceV2 =>
    +          val disabledV2Readers =
    +            
df.sparkSession.sessionState.conf.disabledV2FileDataSourceWriter.split(",")
    +          (disabledV2Readers.contains(f.shortName), 
f.fallBackFileFormat.getCanonicalName)
    +        case _ => (false, source)
    +      }
     
    -        // Streaming also uses the data source V2 API. So it may be that 
the data source implements
    -        // v2, but has no v2 implementation for batch writes. In that 
case, we fall back to saving
    -        // as though it's a V1 source.
    -        case _ => saveToV1Source()
    +      if (ds.isInstanceOf[WriteSupport] && 
!needToFallBackFileDataSourceV2) {
    +        val options = new DataSourceOptions((extraOptions ++
    +          DataSourceV2Utils.extractSessionConfigs(
    +            ds = ds.asInstanceOf[DataSourceV2],
    +            conf = df.sparkSession.sessionState.conf)).asJava)
    +        // Using a timestamp and a random UUID to distinguish different 
writing jobs. This is good
    +        // enough as there won't be tons of writing jobs created at the 
same second.
    +        val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
    +          .format(new Date()) + "-" + UUID.randomUUID()
    +        val writer = ds.asInstanceOf[WriteSupport]
    +          .createWriter(jobId, df.logicalPlan.schema, mode, options)
    --- End diff --
    
    It is. We're still evolving the v2 API and integration with Spark. This 
problem is addressed in PR #21305, which is the first of a series of changes to 
standardize the logical plans and fix problems like this one.
    
    There's also an [open proposal for those 
changes](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to