Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/20448#discussion_r164952844
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala
---
@@ -19,50 +19,31 @@ package org.apache.spark.sql.execution.datasources.v2
import java.util.Objects
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression}
+import org.apache.spark.sql.sources.v2.DataSourceV2
/**
- * A base class for data source reader holder with customized
equals/hashCode methods.
+ * A base class for data source v2 related query plan. It defines the
equals/hashCode methods
+ * according to some common information.
*/
-trait DataSourceReaderHolder {
+trait DataSourceV2QueryPlan {
- /**
- * The full output of the data source reader, without column pruning.
- */
- def fullOutput: Seq[AttributeReference]
+ def output: Seq[Attribute]
+ def sourceClass: Class[_ <: DataSourceV2]
+ def filters: Set[Expression]
- /**
- * The held data source reader.
- */
- def reader: DataSourceReader
-
- /**
- * The metadata of this data source reader that can be used for equality
test.
- */
- private def metadata: Seq[Any] = {
- val filters: Any = reader match {
- case s: SupportsPushDownCatalystFilters =>
s.pushedCatalystFilters().toSet
- case s: SupportsPushDownFilters => s.pushedFilters().toSet
- case _ => Nil
- }
- Seq(fullOutput, reader.getClass, reader.readSchema(), filters)
- }
+ // The metadata of this data source relation that can be used for
equality test.
+ private def metadata: Seq[Any] = Seq(output, sourceClass, filters)
def canEqual(other: Any): Boolean
override def equals(other: Any): Boolean = other match {
- case other: DataSourceReaderHolder =>
- canEqual(other) && metadata.length == other.metadata.length &&
- metadata.zip(other.metadata).forall { case (l, r) => l == r }
+ case other: DataSourceV2QueryPlan =>
+ canEqual(other) && metadata == other.metadata
case _ => false
}
override def hashCode(): Int = {
metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b)
}
-
- lazy val output: Seq[Attribute] = reader.readSchema().map(_.name).map {
name =>
--- End diff --
We don't need to do this anymore. Now the plan is immutable, we have to
create a new plan when applying push down optimizations, and we can also update
`output` at that time.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]