Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/21503#discussion_r195173700
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
---
@@ -32,79 +31,35 @@ import org.apache.spark.sql.types.StructType
case class DataSourceV2Relation(
source: DataSourceV2,
+ output: Seq[AttributeReference],
options: Map[String, String],
- projection: Seq[AttributeReference],
- filters: Option[Seq[Expression]] = None,
userSpecifiedSchema: Option[StructType] = None)
extends LeafNode with MultiInstanceRelation with
DataSourceV2StringFormat {
import DataSourceV2Relation._
- override def simpleString: String = "RelationV2 " + metadataString
-
- override lazy val schema: StructType = reader.readSchema()
-
- override lazy val output: Seq[AttributeReference] = {
- // use the projection attributes to avoid assigning new ids. fields
that are not projected
- // will be assigned new ids, which is okay because they are not
projected.
- val attrMap = projection.map(a => a.name -> a).toMap
- schema.map(f => attrMap.getOrElse(f.name,
- AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
- }
-
- private lazy val v2Options: DataSourceOptions = makeV2Options(options)
+ override def pushedFilters: Seq[Expression] = Seq.empty
- // postScanFilters: filters that need to be evaluated after the scan.
- // pushedFilters: filters that will be pushed down and evaluated in the
underlying data sources.
- // Note: postScanFilters and pushedFilters can overlap, e.g. the parquet
row group filter.
- lazy val (
- reader: DataSourceReader,
- postScanFilters: Seq[Expression],
- pushedFilters: Seq[Expression]) = {
- val newReader = userSpecifiedSchema match {
- case Some(s) =>
- source.asReadSupportWithSchema.createReader(s, v2Options)
- case _ =>
- source.asReadSupport.createReader(v2Options)
- }
-
- DataSourceV2Relation.pushRequiredColumns(newReader,
projection.toStructType)
-
- val (postScanFilters, pushedFilters) = filters match {
- case Some(filterSeq) =>
- DataSourceV2Relation.pushFilters(newReader, filterSeq)
- case _ =>
- (Nil, Nil)
- }
- logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
- logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
-
- (newReader, postScanFilters, pushedFilters)
- }
-
- override def doCanonicalize(): LogicalPlan = {
- val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation]
+ override def simpleString: String = "RelationV2 " + metadataString
- // override output with canonicalized output to avoid attempting to
configure a reader
- val canonicalOutput: Seq[AttributeReference] = this.output
- .map(a => QueryPlan.normalizeExprId(a, projection))
+ lazy val v2Options: DataSourceOptions = makeV2Options(options)
- new DataSourceV2Relation(c.source, c.options, c.projection) {
- override lazy val output: Seq[AttributeReference] = canonicalOutput
- }
+ def newReader: DataSourceReader = userSpecifiedSchema match {
+ case Some(userSchema) =>
+ source.asReadSupportWithSchema.createReader(userSchema, v2Options)
+ case None =>
+ source.asReadSupport.createReader(v2Options)
}
- override def computeStats(): Statistics = reader match {
+ override def computeStats(): Statistics = newReader match {
case r: SupportsReportStatistics =>
Statistics(sizeInBytes =
r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}
override def newInstance(): DataSourceV2Relation = {
--- End diff --
I thought that initially, but the canonicalization test was failing without
this.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]