Github user gengliangwang commented on a diff in the pull request:
https://github.com/apache/spark/pull/21319#discussion_r188299230
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
---
@@ -22,78 +22,57 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
-import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan,
Statistics}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2,
ReadSupport, ReadSupportWithSchema}
-import org.apache.spark.sql.sources.v2.reader.{DataSourceReader,
SupportsPushDownCatalystFilters, SupportsPushDownFilters,
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader,
SupportsReportStatistics}
import org.apache.spark.sql.types.StructType
+/**
+ * A logical plan representing a data source v2 scan.
+ *
+ * @param source An instance of a [[DataSourceV2]] implementation.
+ * @param options The options for this scan. Used to create fresh
[[DataSourceReader]].
+ * @param userSpecifiedSchema The user-specified schema for this scan.
Used to create fresh
+ * [[DataSourceReader]].
+ * @param optimizedReader An optimized [[DataSourceReader]] which is
produced by the optimizer rule
+ * [[PushDownOperatorsToDataSource]]. It is a
temporary value and is excluded
+ * in the equality definition of this class. It is
to avoid re-applying
+ * operators pushdown and re-creating
[[DataSourceReader]] when we copy
+ * the relation during query plan transformation.
+ * @param pushedFilters The filters that are pushed down to the data
source.
+ */
case class DataSourceV2Relation(
+ output: Seq[AttributeReference],
source: DataSourceV2,
options: Map[String, String],
- projection: Seq[AttributeReference],
- filters: Option[Seq[Expression]] = None,
- userSpecifiedSchema: Option[StructType] = None)
+ userSpecifiedSchema: Option[StructType],
+ optimizedReader: Option[DataSourceReader] = None,
+ pushedFilters: Seq[Expression] = Nil)
extends LeafNode with MultiInstanceRelation with
DataSourceV2StringFormat {
import DataSourceV2Relation._
- override def simpleString: String = "RelationV2 " + metadataString
-
- override lazy val schema: StructType = reader.readSchema()
+ def createFreshReader: DataSourceReader = source.createReader(options,
userSpecifiedSchema)
- override lazy val output: Seq[AttributeReference] = {
- // use the projection attributes to avoid assigning new ids. fields
that are not projected
- // will be assigned new ids, which is okay because they are not
projected.
- val attrMap = projection.map(a => a.name -> a).toMap
- schema.map(f => attrMap.getOrElse(f.name,
- AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
- }
+ def reader: DataSourceReader =
optimizedReader.getOrElse(createFreshReader)
- private lazy val v2Options: DataSourceOptions = makeV2Options(options)
-
- // postScanFilters: filters that need to be evaluated after the scan.
- // pushedFilters: filters that will be pushed down and evaluated in the
underlying data sources.
- // Note: postScanFilters and pushedFilters can overlap, e.g. the parquet
row group filter.
--- End diff --
Nit: There can be overlap between postScanFilters and pushedFilters
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]