[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21503


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r195173700
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -32,79 +31,35 @@ import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
 source: DataSourceV2,
+output: Seq[AttributeReference],
 options: Map[String, String],
-projection: Seq[AttributeReference],
-filters: Option[Seq[Expression]] = None,
 userSpecifiedSchema: Option[StructType] = None)
   extends LeafNode with MultiInstanceRelation with 
DataSourceV2StringFormat {
 
   import DataSourceV2Relation._
 
-  override def simpleString: String = "RelationV2 " + metadataString
-
-  override lazy val schema: StructType = reader.readSchema()
-
-  override lazy val output: Seq[AttributeReference] = {
-// use the projection attributes to avoid assigning new ids. fields 
that are not projected
-// will be assigned new ids, which is okay because they are not 
projected.
-val attrMap = projection.map(a => a.name -> a).toMap
-schema.map(f => attrMap.getOrElse(f.name,
-  AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
-  }
-
-  private lazy val v2Options: DataSourceOptions = makeV2Options(options)
+  override def pushedFilters: Seq[Expression] = Seq.empty
 
-  // postScanFilters: filters that need to be evaluated after the scan.
-  // pushedFilters: filters that will be pushed down and evaluated in the 
underlying data sources.
-  // Note: postScanFilters and pushedFilters can overlap, e.g. the parquet 
row group filter.
-  lazy val (
-  reader: DataSourceReader,
-  postScanFilters: Seq[Expression],
-  pushedFilters: Seq[Expression]) = {
-val newReader = userSpecifiedSchema match {
-  case Some(s) =>
-source.asReadSupportWithSchema.createReader(s, v2Options)
-  case _ =>
-source.asReadSupport.createReader(v2Options)
-}
-
-DataSourceV2Relation.pushRequiredColumns(newReader, 
projection.toStructType)
-
-val (postScanFilters, pushedFilters) = filters match {
-  case Some(filterSeq) =>
-DataSourceV2Relation.pushFilters(newReader, filterSeq)
-  case _ =>
-(Nil, Nil)
-}
-logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
-logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
-
-(newReader, postScanFilters, pushedFilters)
-  }
-
-  override def doCanonicalize(): LogicalPlan = {
-val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation]
+  override def simpleString: String = "RelationV2 " + metadataString
 
-// override output with canonicalized output to avoid attempting to 
configure a reader
-val canonicalOutput: Seq[AttributeReference] = this.output
-.map(a => QueryPlan.normalizeExprId(a, projection))
+  lazy val v2Options: DataSourceOptions = makeV2Options(options)
 
-new DataSourceV2Relation(c.source, c.options, c.projection) {
-  override lazy val output: Seq[AttributeReference] = canonicalOutput
-}
+  def newReader: DataSourceReader = userSpecifiedSchema match {
+case Some(userSchema) =>
+  source.asReadSupportWithSchema.createReader(userSchema, v2Options)
+case None =>
+  source.asReadSupport.createReader(v2Options)
   }
 
-  override def computeStats(): Statistics = reader match {
+  override def computeStats(): Statistics = newReader match {
 case r: SupportsReportStatistics =>
   Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
 case _ =>
   Statistics(sizeInBytes = conf.defaultSizeInBytes)
   }
 
   override def newInstance(): DataSourceV2Relation = {
--- End diff --

I thought that initially, but the canonicalization test was failing without 
this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r195170789
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -32,79 +31,35 @@ import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
 source: DataSourceV2,
+output: Seq[AttributeReference],
 options: Map[String, String],
-projection: Seq[AttributeReference],
-filters: Option[Seq[Expression]] = None,
 userSpecifiedSchema: Option[StructType] = None)
   extends LeafNode with MultiInstanceRelation with 
DataSourceV2StringFormat {
 
   import DataSourceV2Relation._
 
-  override def simpleString: String = "RelationV2 " + metadataString
-
-  override lazy val schema: StructType = reader.readSchema()
-
-  override lazy val output: Seq[AttributeReference] = {
-// use the projection attributes to avoid assigning new ids. fields 
that are not projected
-// will be assigned new ids, which is okay because they are not 
projected.
-val attrMap = projection.map(a => a.name -> a).toMap
-schema.map(f => attrMap.getOrElse(f.name,
-  AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
-  }
-
-  private lazy val v2Options: DataSourceOptions = makeV2Options(options)
+  override def pushedFilters: Seq[Expression] = Seq.empty
 
-  // postScanFilters: filters that need to be evaluated after the scan.
-  // pushedFilters: filters that will be pushed down and evaluated in the 
underlying data sources.
-  // Note: postScanFilters and pushedFilters can overlap, e.g. the parquet 
row group filter.
-  lazy val (
-  reader: DataSourceReader,
-  postScanFilters: Seq[Expression],
-  pushedFilters: Seq[Expression]) = {
-val newReader = userSpecifiedSchema match {
-  case Some(s) =>
-source.asReadSupportWithSchema.createReader(s, v2Options)
-  case _ =>
-source.asReadSupport.createReader(v2Options)
-}
-
-DataSourceV2Relation.pushRequiredColumns(newReader, 
projection.toStructType)
-
-val (postScanFilters, pushedFilters) = filters match {
-  case Some(filterSeq) =>
-DataSourceV2Relation.pushFilters(newReader, filterSeq)
-  case _ =>
-(Nil, Nil)
-}
-logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
-logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
-
-(newReader, postScanFilters, pushedFilters)
-  }
-
-  override def doCanonicalize(): LogicalPlan = {
-val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation]
+  override def simpleString: String = "RelationV2 " + metadataString
 
-// override output with canonicalized output to avoid attempting to 
configure a reader
-val canonicalOutput: Seq[AttributeReference] = this.output
-.map(a => QueryPlan.normalizeExprId(a, projection))
+  lazy val v2Options: DataSourceOptions = makeV2Options(options)
 
-new DataSourceV2Relation(c.source, c.options, c.projection) {
-  override lazy val output: Seq[AttributeReference] = canonicalOutput
-}
+  def newReader: DataSourceReader = userSpecifiedSchema match {
+case Some(userSchema) =>
+  source.asReadSupportWithSchema.createReader(userSchema, v2Options)
+case None =>
+  source.asReadSupport.createReader(v2Options)
   }
 
-  override def computeStats(): Statistics = reader match {
+  override def computeStats(): Statistics = newReader match {
 case r: SupportsReportStatistics =>
   Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
 case _ =>
   Statistics(sizeInBytes = conf.defaultSizeInBytes)
   }
 
   override def newInstance(): DataSourceV2Relation = {
--- End diff --

I think we don't need to override `newInstance` now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r195138932
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case r: DataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
+  val projectSet = AttributeSet(project.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  val projection = if (filterSet.subsetOf(projectSet) &&
+  AttributeSet(relation.output) == projectSet) {
+// When the required projection contains all of the filter columns 
and column pruning alone
+// can produce the required projection, push the required 
projection.
+// A final projection may still be needed if the data source 
produces a different column
+// order or if it cannot prune all of the nested columns.
+relation.output
+  } else {
+// When there are filter columns not already in the required 
projection or when the required
+// projection is more complicated than column pruning, base column 
pruning on the set of
+// all columns needed by both.
+(projectSet ++ filterSet).toSeq
+  }
+
+  val reader = relation.newReader
--- End diff --

Yeah, no problem. I can just remove the stats commit from this one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r194961443
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case r: DataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
+  val projectSet = AttributeSet(project.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  val projection = if (filterSet.subsetOf(projectSet) &&
+  AttributeSet(relation.output) == projectSet) {
+// When the required projection contains all of the filter columns 
and column pruning alone
+// can produce the required projection, push the required 
projection.
+// A final projection may still be needed if the data source 
produces a different column
+// order or if it cannot prune all of the nested columns.
+relation.output
+  } else {
+// When there are filter columns not already in the required 
projection or when the required
+// projection is more complicated than column pruning, base column 
pruning on the set of
+// all columns needed by both.
+(projectSet ++ filterSet).toSeq
+  }
+
+  val reader = relation.newReader
--- End diff --

@rdblue do you have time to prepare a PR for the 2rd proposal? I can do 
that too if you are busy with other stuff.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-12 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r194953803
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case r: DataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
+  val projectSet = AttributeSet(project.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  val projection = if (filterSet.subsetOf(projectSet) &&
+  AttributeSet(relation.output) == projectSet) {
+// When the required projection contains all of the filter columns 
and column pruning alone
+// can produce the required projection, push the required 
projection.
+// A final projection may still be needed if the data source 
produces a different column
+// order or if it cannot prune all of the nested columns.
+relation.output
+  } else {
+// When there are filter columns not already in the required 
projection or when the required
+// projection is more complicated than column pruning, base column 
pruning on the set of
+// all columns needed by both.
+(projectSet ++ filterSet).toSeq
+  }
+
+  val reader = relation.newReader
--- End diff --

I'm not strongly opposed to any of the options, but 2 would be my choice if 
I had to pick one. A temporary state where functionality is missing is easier 
to reason about than temporary states where we deliberately impose a fuzzy 
lifecycle.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r194895594
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case r: DataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
+  val projectSet = AttributeSet(project.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  val projection = if (filterSet.subsetOf(projectSet) &&
+  AttributeSet(relation.output) == projectSet) {
+// When the required projection contains all of the filter columns 
and column pruning alone
+// can produce the required projection, push the required 
projection.
+// A final projection may still be needed if the data source 
produces a different column
+// order or if it cannot prune all of the nested columns.
+relation.output
+  } else {
+// When there are filter columns not already in the required 
projection or when the required
+// projection is more complicated than column pruning, base column 
pruning on the set of
+// all columns needed by both.
+(projectSet ++ filterSet).toSeq
+  }
+
+  val reader = relation.newReader
--- End diff --

Yea the second proposal is what happens for the v1 data sources. For 
file-based data source we kind of pick the third proposal and add an optimizer 
rule `PruneFileSourcePartitions` to push down some of the filters to data 
source at the logical phase, to get precise stats.

I'd like to pick from the 2nd and 3rd proposals(the 3rd proposal is also 
temporary, before we move stats to physical plan). Applying pushdown twice is 
hard to workaround(need to cache), while we can keep the 
`PruneFileSourcePartitions` rule to work around the issue in 2nd proposal for 
file-based data sources.

Let's also get more inputs from other people.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r194875888
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case r: DataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
+  val projectSet = AttributeSet(project.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  val projection = if (filterSet.subsetOf(projectSet) &&
+  AttributeSet(relation.output) == projectSet) {
+// When the required projection contains all of the filter columns 
and column pruning alone
+// can produce the required projection, push the required 
projection.
+// A final projection may still be needed if the data source 
produces a different column
+// order or if it cannot prune all of the nested columns.
+relation.output
+  } else {
+// When there are filter columns not already in the required 
projection or when the required
+// projection is more complicated than column pruning, base column 
pruning on the set of
+// all columns needed by both.
+(projectSet ++ filterSet).toSeq
+  }
+
+  val reader = relation.newReader
--- End diff --

I don't mind either option #1 or #2. #2 is basically what happens for 
non-v2 data sources right now. Plus, both should be temporary.

I think it is a bad idea to continue with hacky code that uses the reader 
in the logical plan. It is much cleaner otherwise and we've spend too much time 
making sure that everything still works. The main example that comes to mind is 
setting the requested projection and finding out what output is using pushdown. 
I think hacks are slowing progress on the v2 sources.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r194871032
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case r: DataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
+  val projectSet = AttributeSet(project.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  val projection = if (filterSet.subsetOf(projectSet) &&
+  AttributeSet(relation.output) == projectSet) {
+// When the required projection contains all of the filter columns 
and column pruning alone
+// can produce the required projection, push the required 
projection.
+// A final projection may still be needed if the data source 
produces a different column
+// order or if it cannot prune all of the nested columns.
+relation.output
+  } else {
+// When there are filter columns not already in the required 
projection or when the required
+// projection is more complicated than column pruning, base column 
pruning on the set of
+// all columns needed by both.
+(projectSet ++ filterSet).toSeq
+  }
+
+  val reader = relation.newReader
--- End diff --

OK we need to make a decision here:
1. Apply operator pushdown twice(proposed by tihs PR). This moves the 
pushdown logic to planner which is more ideal and cleaner. The drawback is, 
before moving statistics to physical plan, we have some duplicated pushdown 
code in `DataSourceV2Relation` and applying pushdown twice has performance 
penalty.
2. Apply operator pushdown only once in the planner. Same as 1, it's 
cleaner. The drawback is, before moving statistics to physical plan, data 
source v2 can't report statistics after filter.
3. Apply operator pushdown only once in the optimizer(proposed by 
https://github.com/apache/spark/pull/21319). It has no performance penalty and 
we can report statistics after filter. The drawback is, before moving 
statistics to physical plan, we have a temporary `DataSourceReader` in 
`DataSourceV2Relation`, which is hacky.

The tradeoff is: shall we bear with hacky code and move forward with the 
data source v2 operator pushdown support? or shall we make the code cleaner and 
bear with some performance pemalty(apply pushdown twice or not report stats 
after filter)? or shall we just hold back and think about how to move stats to 
physical plan?

cc @marmbrus @jose-torres 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r194861645
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case r: DataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
+  val projectSet = AttributeSet(project.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  val projection = if (filterSet.subsetOf(projectSet) &&
+  AttributeSet(relation.output) == projectSet) {
+// When the required projection contains all of the filter columns 
and column pruning alone
+// can produce the required projection, push the required 
projection.
+// A final projection may still be needed if the data source 
produces a different column
+// order or if it cannot prune all of the nested columns.
+relation.output
+  } else {
+// When there are filter columns not already in the required 
projection or when the required
+// projection is more complicated than column pruning, base column 
pruning on the set of
+// all columns needed by both.
+(projectSet ++ filterSet).toSeq
+  }
+
+  val reader = relation.newReader
--- End diff --

I didn't realize you were talking about other v2 sources. Yes, two readers 
would be configured for v2. If you wanted to avoid it, you could cache when 
pushdown is expensive in the implementation or we could add something else that 
prevents that case.

We need to do *something* to fix the current behavior of doing pushdown in 
the optimizer. I'm perfectly happy with less accurate stats for v2 until stats 
use the physical plan, or a solution like this where pushdown happens twice. I 
just don't think it is a good idea to continue with the design where the 
logical plan needs to use the v2 reader APIs. I think we agree that that should 
happen once, and conversion to physical plan is where it makes the most sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r194858392
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case r: DataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
+  val projectSet = AttributeSet(project.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  val projection = if (filterSet.subsetOf(projectSet) &&
+  AttributeSet(relation.output) == projectSet) {
+// When the required projection contains all of the filter columns 
and column pruning alone
+// can produce the required projection, push the required 
projection.
+// A final projection may still be needed if the data source 
produces a different column
+// order or if it cannot prune all of the nested columns.
+relation.output
+  } else {
+// When there are filter columns not already in the required 
projection or when the required
+// projection is more complicated than column pruning, base column 
pruning on the set of
+// all columns needed by both.
+(projectSet ++ filterSet).toSeq
+  }
+
+  val reader = relation.newReader
--- End diff --

> there's nothing forcing other data sources to implement the new trait ...

hmmm, I'm a little confused here. All v2 data sources would have to apply 
pushdown twice right? Or are you suggesting we should not migrate file-based 
data source to data source v2?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r194841328
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case r: DataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
+  val projectSet = AttributeSet(project.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  val projection = if (filterSet.subsetOf(projectSet) &&
+  AttributeSet(relation.output) == projectSet) {
+// When the required projection contains all of the filter columns 
and column pruning alone
+// can produce the required projection, push the required 
projection.
+// A final projection may still be needed if the data source 
produces a different column
+// order or if it cannot prune all of the nested columns.
+relation.output
+  } else {
+// When there are filter columns not already in the required 
projection or when the required
+// projection is more complicated than column pruning, base column 
pruning on the set of
+// all columns needed by both.
+(projectSet ++ filterSet).toSeq
+  }
+
+  val reader = relation.newReader
--- End diff --

@cloud-fan, there's nothing forcing other data sources to implement the new 
trait. Other sources can continue to report stats for the entire table and not 
account for filters (the code assumes that row counts don't change). This just 
opens the option of reporting stats that are more accurate using the filters 
and projection that will be pushed.

Ideally, I think that stats-based decisions would happen after pushdown so 
we get data that is as accurate as possible. But for now, this fixes the 
regression for v2 sources that happens because we move pushdown to a later step 
(conversion to physical plan like the other sources).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r194839473
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case r: DataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
+  val projectSet = AttributeSet(project.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  val projection = if (filterSet.subsetOf(projectSet) &&
+  AttributeSet(relation.output) == projectSet) {
+// When the required projection contains all of the filter columns 
and column pruning alone
+// can produce the required projection, push the required 
projection.
+// A final projection may still be needed if the data source 
produces a different column
+// order or if it cannot prune all of the nested columns.
+relation.output
+  } else {
+// When there are filter columns not already in the required 
projection or when the required
+// projection is more complicated than column pruning, base column 
pruning on the set of
+// all columns needed by both.
+(projectSet ++ filterSet).toSeq
+  }
+
+  val reader = relation.newReader
--- End diff --

it's nice to decouple the problem and do pushdown during planning, but I 
feel the cost is too high in this approach. For file-based data sources, we 
need to query hive metastore to apply partitioning pruning during filter 
pushdown, and this can be very expensive. Doing it twice looks scaring to me.

cc @gatorsmile @dongjoon-hyun @mallman , please correct me if I have a 
wrong understanding.

also cc @wzhfy do you have an estimation about how long it takes to move 
statistics to physical plan?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r194829647
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case r: DataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
+  val projectSet = AttributeSet(project.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  val projection = if (filterSet.subsetOf(projectSet) &&
+  AttributeSet(relation.output) == projectSet) {
+// When the required projection contains all of the filter columns 
and column pruning alone
+// can produce the required projection, push the required 
projection.
+// A final projection may still be needed if the data source 
produces a different column
+// order or if it cannot prune all of the nested columns.
+relation.output
+  } else {
+// When there are filter columns not already in the required 
projection or when the required
+// projection is more complicated than column pruning, base column 
pruning on the set of
+// all columns needed by both.
+(projectSet ++ filterSet).toSeq
+  }
+
+  val reader = relation.newReader
--- End diff --

It will configure two readers. One for the pushdown when converting to a 
physical plan and one for stats. The stats one should be temporary, though, 
since we want to address the problem. Configuring two readers instead of one 
allows us to decouple the problems so we can move forward with pushdown that 
works like the other data sources.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21503#discussion_r194828220
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -17,15 +17,56 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case r: DataSourceV2Relation =>
-  DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
+case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
+  val projectSet = AttributeSet(project.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  val projection = if (filterSet.subsetOf(projectSet) &&
+  AttributeSet(relation.output) == projectSet) {
+// When the required projection contains all of the filter columns 
and column pruning alone
+// can produce the required projection, push the required 
projection.
+// A final projection may still be needed if the data source 
produces a different column
+// order or if it cannot prune all of the nested columns.
+relation.output
+  } else {
+// When there are filter columns not already in the required 
projection or when the required
+// projection is more complicated than column pruning, base column 
pruning on the set of
+// all columns needed by both.
+(projectSet ++ filterSet).toSeq
+  }
+
+  val reader = relation.newReader
--- End diff --

to confirm, do we have to do operator pushdown twice now? One in the plan 
visitor to calculate statistics, one here to build the physical plan, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21503: [SPARK-24478][SQL] Move projection and filter pus...

2018-06-06 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/21503

[SPARK-24478][SQL] Move projection and filter push down to physical 
conversion

## What changes were proposed in this pull request?

This removes the v2 optimizer rule for push-down and instead pushes filters 
and required columns when converting to a physical plan, as suggested by 
@marmbrus. This makes the v2 relation cleaner because the output and filters do 
not change in the logical plan.

To solve the problem of getting accurate statistics in the optimizer 
(push-down happens later, now), this adds a new trait, SupportsPhysicalStats 
that calculates LeafNode stats using the filters and projection. This trait can 
also be implemented by v1 data sources to get more accurate stats for CBO.

The first commit was proposed in #21262. This PR replaces #21262.

## How was this patch tested?

Existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-24478-move-push-down-to-physical-conversion

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21503.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21503


commit c8517e145b1a460a8be07164c17ce20b1db86659
Author: Ryan Blue 
Date:   2018-05-07T20:08:02Z

DataSourceV2: push projection, filters when converting to physical plan.

commit 9d3a11e68bca6c5a56a2be47fb09395350362ac5
Author: Ryan Blue 
Date:   2018-06-06T20:17:16Z

SPARK-24478: Add trait to report stats with filters and projection.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org