Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20387#discussion_r167288341
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
    @@ -17,17 +17,130 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    +import java.util.UUID
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.{AnalysisException, SaveMode}
     import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
    -import org.apache.spark.sql.catalyst.expressions.AttributeReference
    -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
    -import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
    +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
    +import org.apache.spark.sql.execution.datasources.DataSourceStrategy
    +import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
    +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
    +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
    +import org.apache.spark.sql.types.StructType
     
     case class DataSourceV2Relation(
    -    output: Seq[AttributeReference],
    -    reader: DataSourceReader)
    -  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
    +    source: DataSourceV2,
    +    options: Map[String, String],
    +    projection: Option[Seq[AttributeReference]] = None,
    +    filters: Option[Seq[Expression]] = None,
    +    userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
    +
    +  override def simpleString: String = {
    +    s"DataSourceV2Relation(source=$sourceName, " +
    +      s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
    +      s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
    +  }
    +
    +  override lazy val schema: StructType = reader.readSchema()
    +
    +  override lazy val output: Seq[AttributeReference] = {
    +    projection match {
    +      case Some(attrs) =>
    +        // use the projection attributes to avoid assigning new ids. 
fields that are not projected
    +        // will be assigned new ids, which is okay because they are not 
projected.
    +        val attrMap = attrs.map(a => a.name -> a).toMap
    +        schema.map(f => attrMap.getOrElse(f.name,
    +          AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
    +      case _ =>
    +        schema.toAttributes
    +    }
    +  }
    +
    +  private lazy val v2Options: DataSourceOptions = {
    +    // ensure path and table options are set correctly
    +    val updatedOptions = new mutable.HashMap[String, String]
    +    updatedOptions ++= options
    +
    +    new DataSourceOptions(options.asJava)
    +  }
    +
    +  private val sourceName: String = {
    +    source match {
    +      case registered: DataSourceRegister =>
    +        registered.shortName()
    +      case _ =>
    +        source.getClass.getSimpleName
    +    }
    +  }
    +
    +  lazy val (
    +      reader: DataSourceReader,
    +      unsupportedFilters: Seq[Expression],
    +      pushedFilters: Seq[Expression]) = {
    +    val newReader = userSchema match {
    +      case Some(s) =>
    +        asReadSupportWithSchema.createReader(s, v2Options)
    +      case _ =>
    +        asReadSupport.createReader(v2Options)
    +    }
    +
    +    projection.foreach { attrs =>
    +      DataSourceV2Relation.pushRequiredColumns(newReader, 
attrs.toStructType)
    +    }
    +
    +    val (remainingFilters, pushedFilters) = filters match {
    +      case Some(filterSeq) =>
    +        DataSourceV2Relation.pushFilters(newReader, filterSeq)
    +      case _ =>
    +        (Nil, Nil)
    +    }
    +
    +    (newReader, remainingFilters, pushedFilters)
    +  }
     
    -  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[DataSourceV2Relation]
    +  def writer(dfSchema: StructType, mode: SaveMode): 
Option[DataSourceWriter] = {
    --- End diff --
    
    There's a key difference here that you're missing. The streaming API was 
rushed and committed without adequate review input. The comment indicated that 
the commit was probably unfinished and, if so, the remaining changes could be 
put into a completely different commit.
    
    In contrast, this contained a few additional things that make 
`DataSourceV2Relation` complete and more useful for the next PRs. Thinking 
about how this relation should be configured from all the code paths where it 
will be used and preparing for those cases just isn't the same kind of problem.
    
    I do appreciate you taking my recommendation to heart, but if you think 
this is "the same mistake," then I don't think you've quite understood what I'm 
saying.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to