Github user yhuai commented on a diff in the pull request:
https://github.com/apache/spark/pull/9468#discussion_r43942705
--- Diff:
sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
---
@@ -70,43 +76,258 @@ class SimpleTextHadoopFsRelationSuite extends
HadoopFsRelationTest {
}
}
- private val writer = testDF.write.option("dataSchema",
dataSchema.json).format(dataSourceName)
- private val reader = sqlContext.read.option("dataSchema",
dataSchema.json).format(dataSourceName)
-
- test("unhandledFilters") {
- withTempPath { dir =>
-
- val path = dir.getCanonicalPath
- writer.save(s"$path/p=0")
- writer.save(s"$path/p=1")
-
- val isOdd = udf((_: Int) % 2 == 1)
- val df = reader.load(path)
- .filter(
- // This filter is inconvertible
- isOdd('a) &&
- // This filter is convertible but unhandled
- 'a > 1 &&
- // This filter is convertible and handled
- 'b > "val_1" &&
- // This filter references a partiiton column, won't be pushed
down
- 'p === 1
- ).select('a, 'p)
- val rawScan = df.queryExecution.executedPlan collect {
+ private var tempPath: File = _
+
+ private var partitionedDF: DataFrame = _
+
+ private val partitionedDataSchema: StructType = StructType('a.int ::
'b.int :: 'c.string :: Nil)
+
+ protected override def beforeAll(): Unit = {
+ this.tempPath = Utils.createTempDir()
+
+ val df = sqlContext.range(10).select(
+ 'id cast IntegerType as 'a,
+ ('id cast IntegerType) * 2 as 'b,
+ concat(lit("val_"), 'id) as 'c
+ )
+
+ partitionedWriter(df).save(s"${tempPath.getCanonicalPath}/p=0")
+ partitionedWriter(df).save(s"${tempPath.getCanonicalPath}/p=1")
+
+ partitionedDF = partitionedReader.load(tempPath.getCanonicalPath)
+ }
+
+ override protected def afterAll(): Unit = {
+ Utils.deleteRecursively(tempPath)
+ }
+
+ private def partitionedWriter(df: DataFrame) =
+ df.write.option("dataSchema",
partitionedDataSchema.json).format(dataSourceName)
+
+ private def partitionedReader =
+ sqlContext.read.option("dataSchema",
partitionedDataSchema.json).format(dataSourceName)
+
+ /**
+ * Constructs test cases that test column pruning and filter push-down.
+ *
+ * @param projections Projection list of the query
+ * @param filter Filter condition of the query
+ * @param requiredColumns Expected names of required columns
+ * @param pushedFilters Expected data source [[Filter]]s that are pushed
down
+ * @param inconvertibleFilters Expected Catalyst filter [[Expression]]s
that cannot be converted
+ * to data source [[Filter]]s
+ * @param unhandledFilters Expected Catalyst flter [[Expression]]s that
can be converted to data
+ * source [[Filter]]s but cannot be handled by the data source
relation
+ * @param partitioningFilters Expected Catalyst filter [[Expression]]s
that reference partition
+ * columns
+ * @param expectedRawScanAnswer Expected query result of the raw table
scan returned by the data
+ * source relation
+ * @param expectedAnswer Expected query result of the full query
+ */
+ def testPruningAndFiltering(
+ projections: Seq[Column],
+ filter: Column,
+ requiredColumns: Seq[String],
+ pushedFilters: Seq[Filter],
+ inconvertibleFilters: Seq[Column],
+ unhandledFilters: Seq[Column],
+ partitioningFilters: Seq[Column])(
+ expectedRawScanAnswer: => Seq[Row])(
+ expectedAnswer: => Seq[Row]): Unit = {
+ test(s"pruning and filtering: df.select(${projections.mkString(",
")}).where($filter)") {
+ val df = partitionedDF.where(filter).select(projections: _*)
+ val queryExecution = df.queryExecution
+ val executedPlan = queryExecution.executedPlan
+
+ val rawScan = executedPlan.collect {
case p: PhysicalRDD => p
} match {
- case Seq(p) => p
+ case Seq(scan) => scan
+ case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
}
- val outputSchema = new StructType().add("a", IntegerType).add("p",
IntegerType)
+ markup("Checking raw scan answer")
+ checkAnswer(
+ DataFrame(sqlContext, LogicalRDD(rawScan.output,
rawScan.rdd)(sqlContext)),
+ expectedRawScanAnswer)
+
+ markup("Checking full query answer")
+ checkAnswer(df, expectedAnswer)
+
+ markup("Checking required columns")
+ assert(requiredColumns === SimpleTextRelation.requiredColumns)
- assertResult(Set((2, 1), (3, 1))) {
- rawScan.execute().collect()
- .map { CatalystTypeConverters.convertToScala(_, outputSchema) }
- .map { case Row(a, p) => (a, p) }.toSet
+ val nonPushedFilters = {
+ val boundFilters = executedPlan.collect {
+ case f: execution.Filter => f
+ } match {
+ case Nil => Nil
+ case Seq(f) => splitConjunctivePredicates(f.condition)
+ case _ => fail(s"More than one PhysicalRDD
found\n$queryExecution")
+ }
+
+ boundFilters.map {
+ _.transform { case a: AttributeReference =>
UnresolvedAttribute(a.name) }
+ }.toSet
}
- checkAnswer(df, Row(3, 1))
+ markup("Checking pushed filters")
+ assert(SimpleTextRelation.pushedFilters === pushedFilters.toSet)
+
+ val expectedInconvertibleFilters =
inconvertibleFilters.map(_.expr).toSet
+ val expectedUnhandledFilters = unhandledFilters.map(_.expr).toSet
+ val expectedPartitioningFilters =
partitioningFilters.map(_.expr).toSet
+
+ markup("Checking unhandled, inconvertible, and partitioning filters")
+ assert(expectedInconvertibleFilters ++ expectedUnhandledFilters ===
nonPushedFilters)
+
assert(expectedPartitioningFilters.intersect(nonPushedFilters).isEmpty)
--- End diff --
Seems it is not very obvious why
`expectedPartitioningFilters.intersect(nonPushedFilters).isEmpty`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]