Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21319#discussion_r188299230
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
    @@ -22,78 +22,57 @@ import scala.collection.JavaConverters._
     import org.apache.spark.sql.AnalysisException
     import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
     import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
    -import org.apache.spark.sql.catalyst.plans.QueryPlan
     import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
    -import org.apache.spark.sql.execution.datasources.DataSourceStrategy
    -import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
    +import org.apache.spark.sql.sources.DataSourceRegister
     import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
    -import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
    +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsReportStatistics}
     import org.apache.spark.sql.types.StructType
     
    +/**
    + * A logical plan representing a data source v2 scan.
    + *
    + * @param source An instance of a [[DataSourceV2]] implementation.
    + * @param options The options for this scan. Used to create fresh 
[[DataSourceReader]].
    + * @param userSpecifiedSchema The user-specified schema for this scan. 
Used to create fresh
    + *                            [[DataSourceReader]].
    + * @param optimizedReader An optimized [[DataSourceReader]] which is 
produced by the optimizer rule
    + *                        [[PushDownOperatorsToDataSource]]. It is a 
temporary value and is excluded
    + *                        in the equality definition of this class. It is 
to avoid re-applying
    + *                        operators pushdown and re-creating 
[[DataSourceReader]] when we copy
    + *                        the relation during query plan transformation.
    + * @param pushedFilters The filters that are pushed down to the data 
source.
    + */
     case class DataSourceV2Relation(
    +    output: Seq[AttributeReference],
         source: DataSourceV2,
         options: Map[String, String],
    -    projection: Seq[AttributeReference],
    -    filters: Option[Seq[Expression]] = None,
    -    userSpecifiedSchema: Option[StructType] = None)
    +    userSpecifiedSchema: Option[StructType],
    +    optimizedReader: Option[DataSourceReader] = None,
    +    pushedFilters: Seq[Expression] = Nil)
       extends LeafNode with MultiInstanceRelation with 
DataSourceV2StringFormat {
     
       import DataSourceV2Relation._
     
    -  override def simpleString: String = "RelationV2 " + metadataString
    -
    -  override lazy val schema: StructType = reader.readSchema()
    +  def createFreshReader: DataSourceReader = source.createReader(options, 
userSpecifiedSchema)
     
    -  override lazy val output: Seq[AttributeReference] = {
    -    // use the projection attributes to avoid assigning new ids. fields 
that are not projected
    -    // will be assigned new ids, which is okay because they are not 
projected.
    -    val attrMap = projection.map(a => a.name -> a).toMap
    -    schema.map(f => attrMap.getOrElse(f.name,
    -      AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
    -  }
    +  def reader: DataSourceReader = 
optimizedReader.getOrElse(createFreshReader)
     
    -  private lazy val v2Options: DataSourceOptions = makeV2Options(options)
    -
    -  // postScanFilters: filters that need to be evaluated after the scan.
    -  // pushedFilters: filters that will be pushed down and evaluated in the 
underlying data sources.
    -  // Note: postScanFilters and pushedFilters can overlap, e.g. the parquet 
row group filter.
    --- End diff --
    
    Nit: There can be overlap between postScanFilters and pushedFilters


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to