Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9468#discussion_r43963399
  
    --- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
 ---
    @@ -70,43 +76,258 @@ class SimpleTextHadoopFsRelationSuite extends 
HadoopFsRelationTest {
         }
       }
     
    -  private val writer = testDF.write.option("dataSchema", 
dataSchema.json).format(dataSourceName)
    -  private val reader = sqlContext.read.option("dataSchema", 
dataSchema.json).format(dataSourceName)
    -
    -  test("unhandledFilters") {
    -    withTempPath { dir =>
    -
    -      val path = dir.getCanonicalPath
    -      writer.save(s"$path/p=0")
    -      writer.save(s"$path/p=1")
    -
    -      val isOdd = udf((_: Int) % 2 == 1)
    -      val df = reader.load(path)
    -        .filter(
    -          // This filter is inconvertible
    -          isOdd('a) &&
    -            // This filter is convertible but unhandled
    -            'a > 1 &&
    -            // This filter is convertible and handled
    -            'b > "val_1" &&
    -            // This filter references a partiiton column, won't be pushed 
down
    -            'p === 1
    -        ).select('a, 'p)
    -      val rawScan = df.queryExecution.executedPlan collect {
    +  private var tempPath: File = _
    +
    +  private var partitionedDF: DataFrame = _
    +
    +  private val partitionedDataSchema: StructType = StructType('a.int :: 
'b.int :: 'c.string :: Nil)
    +
    +  protected override def beforeAll(): Unit = {
    +    this.tempPath = Utils.createTempDir()
    +
    +    val df = sqlContext.range(10).select(
    +      'id cast IntegerType as 'a,
    +      ('id cast IntegerType) * 2 as 'b,
    +      concat(lit("val_"), 'id) as 'c
    +    )
    +
    +    partitionedWriter(df).save(s"${tempPath.getCanonicalPath}/p=0")
    +    partitionedWriter(df).save(s"${tempPath.getCanonicalPath}/p=1")
    +
    +    partitionedDF = partitionedReader.load(tempPath.getCanonicalPath)
    +  }
    +
    +  override protected def afterAll(): Unit = {
    +    Utils.deleteRecursively(tempPath)
    +  }
    +
    +  private def partitionedWriter(df: DataFrame) =
    +    df.write.option("dataSchema", 
partitionedDataSchema.json).format(dataSourceName)
    +
    +  private def partitionedReader =
    +    sqlContext.read.option("dataSchema", 
partitionedDataSchema.json).format(dataSourceName)
    +
    +  /**
    +   * Constructs test cases that test column pruning and filter push-down.
    +   *
    +   * @param projections Projection list of the query
    +   * @param filter Filter condition of the query
    +   * @param requiredColumns Expected names of required columns
    +   * @param pushedFilters Expected data source [[Filter]]s that are pushed 
down
    +   * @param inconvertibleFilters Expected Catalyst filter [[Expression]]s 
that cannot be converted
    +   *        to data source [[Filter]]s
    +   * @param unhandledFilters Expected Catalyst flter [[Expression]]s that 
can be converted to data
    +   *        source [[Filter]]s but cannot be handled by the data source 
relation
    +   * @param partitioningFilters Expected Catalyst filter [[Expression]]s 
that reference partition
    +   *        columns
    +   * @param expectedRawScanAnswer Expected query result of the raw table 
scan returned by the data
    +   *        source relation
    +   * @param expectedAnswer Expected query result of the full query
    +   */
    +  def testPruningAndFiltering(
    +      projections: Seq[Column],
    +      filter: Column,
    +      requiredColumns: Seq[String],
    +      pushedFilters: Seq[Filter],
    +      inconvertibleFilters: Seq[Column],
    +      unhandledFilters: Seq[Column],
    +      partitioningFilters: Seq[Column])(
    +      expectedRawScanAnswer: => Seq[Row])(
    +      expectedAnswer: => Seq[Row]): Unit = {
    +    test(s"pruning and filtering: df.select(${projections.mkString(", 
")}).where($filter)") {
    +      val df = partitionedDF.where(filter).select(projections: _*)
    +      val queryExecution = df.queryExecution
    +      val executedPlan = queryExecution.executedPlan
    +
    +      val rawScan = executedPlan.collect {
             case p: PhysicalRDD => p
           } match {
    -        case Seq(p) => p
    +        case Seq(scan) => scan
    +        case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
           }
     
    -      val outputSchema = new StructType().add("a", IntegerType).add("p", 
IntegerType)
    +      markup("Checking raw scan answer")
    +      checkAnswer(
    +        DataFrame(sqlContext, LogicalRDD(rawScan.output, 
rawScan.rdd)(sqlContext)),
    +        expectedRawScanAnswer)
    +
    +      markup("Checking full query answer")
    +      checkAnswer(df, expectedAnswer)
    +
    +      markup("Checking required columns")
    +      assert(requiredColumns === SimpleTextRelation.requiredColumns)
     
    -      assertResult(Set((2, 1), (3, 1))) {
    -        rawScan.execute().collect()
    -          .map { CatalystTypeConverters.convertToScala(_, outputSchema) }
    -          .map { case Row(a, p) => (a, p) }.toSet
    +      val nonPushedFilters = {
    +        val boundFilters = executedPlan.collect {
    +          case f: execution.Filter => f
    +        } match {
    +          case Nil => Nil
    +          case Seq(f) => splitConjunctivePredicates(f.condition)
    +          case _ => fail(s"More than one PhysicalRDD 
found\n$queryExecution")
    +        }
    +
    +        boundFilters.map {
    +          _.transform { case a: AttributeReference => 
UnresolvedAttribute(a.name) }
    +        }.toSet
           }
     
    -      checkAnswer(df, Row(3, 1))
    +      markup("Checking pushed filters")
    +      assert(SimpleTextRelation.pushedFilters === pushedFilters.toSet)
    +
    +      val expectedInconvertibleFilters = 
inconvertibleFilters.map(_.expr).toSet
    +      val expectedUnhandledFilters = unhandledFilters.map(_.expr).toSet
    +      val expectedPartitioningFilters = 
partitioningFilters.map(_.expr).toSet
    +
    +      markup("Checking unhandled, inconvertible, and partitioning filters")
    +      assert(expectedInconvertibleFilters ++ expectedUnhandledFilters === 
nonPushedFilters)
    +      
assert(expectedPartitioningFilters.intersect(nonPushedFilters).isEmpty)
         }
       }
    +
    +  testPruningAndFiltering(
    +    projections = Seq('*),
    +    filter = 'p > 0,
    +    requiredColumns = Seq("a", "b", "c"),
    +    pushedFilters = Nil,
    +    inconvertibleFilters = Nil,
    +    unhandledFilters = Nil,
    +    partitioningFilters = Nil
    +  ) {
    +    Seq(
    +      Row(0, 0, "val_0", 1),
    +      Row(1, 2, "val_1", 1),
    +      Row(2, 4, "val_2", 1),
    +      Row(3, 6, "val_3", 1),
    +      Row(4, 8, "val_4", 1),
    +      Row(5, 10, "val_5", 1),
    +      Row(6, 12, "val_6", 1),
    +      Row(7, 14, "val_7", 1),
    +      Row(8, 16, "val_8", 1),
    +      Row(9, 18, "val_9", 1))
    +  } {
    +    Seq(
    +      Row(0, 0, "val_0", 1),
    +      Row(1, 2, "val_1", 1),
    +      Row(2, 4, "val_2", 1),
    +      Row(3, 6, "val_3", 1),
    +      Row(4, 8, "val_4", 1),
    +      Row(5, 10, "val_5", 1),
    +      Row(6, 12, "val_6", 1),
    +      Row(7, 14, "val_7", 1),
    +      Row(8, 16, "val_8", 1),
    +      Row(9, 18, "val_9", 1))
    +  }
    +
    +  testPruningAndFiltering(
    +    projections = Seq('c, 'p),
    +    filter = 'a < 3 && 'p > 0,
    +    requiredColumns = Seq("c", "a"),
    +    pushedFilters = Nil,
    +    inconvertibleFilters = Nil,
    +    unhandledFilters = Seq('a < 3),
    +    partitioningFilters = Nil
    +  ) {
    +    Seq(
    +      Row("val_0", 1, 0),
    +      Row("val_1", 1, 1),
    +      Row("val_2", 1, 2),
    +      Row("val_3", 1, 3),
    +      Row("val_4", 1, 4),
    +      Row("val_5", 1, 5),
    +      Row("val_6", 1, 6),
    +      Row("val_7", 1, 7),
    +      Row("val_8", 1, 8),
    +      Row("val_9", 1, 9))
    +  } {
    +    Seq(
    +      Row("val_0", 1),
    +      Row("val_1", 1),
    +      Row("val_2", 1))
    +  }
    +
    +  testPruningAndFiltering(
    +    projections = Seq('*),
    +    filter = 'a > 8,
    +    requiredColumns = Seq("a", "b", "c"),
    +    pushedFilters = Seq(GreaterThan("a", 8)),
    +    inconvertibleFilters = Nil,
    +    unhandledFilters = Nil,
    +    partitioningFilters = Nil
    +  ) {
    +    Seq(
    +      Row(9, 18, "val_9", 0),
    +      Row(9, 18, "val_9", 1))
    +  } {
    +    Seq(
    +      Row(9, 18, "val_9", 0),
    +      Row(9, 18, "val_9", 1))
    +  }
    +
    +  testPruningAndFiltering(
    +    projections = Seq('b, 'p),
    +    filter = 'a > 8,
    +    requiredColumns = Seq("b"),
    +    pushedFilters = Seq(GreaterThan("a", 8)),
    +    inconvertibleFilters = Nil,
    +    unhandledFilters = Nil,
    +    partitioningFilters = Nil
    +  ) {
    +    Seq(
    +      Row(18, 0),
    +      Row(18, 1))
    +  } {
    +    Seq(
    +      Row(18, 0),
    +      Row(18, 1))
    +  }
    +
    +  testPruningAndFiltering(
    +    projections = Seq('b, 'p),
    +    filter = 'a > 8 && 'p > 0,
    +    requiredColumns = Seq("b"),
    +    pushedFilters = Seq(GreaterThan("a", 8)),
    +    inconvertibleFilters = Nil,
    +    unhandledFilters = Nil,
    +    partitioningFilters = Seq('p > 0)
    +  ) {
    +    Seq(
    +      Row(18, 1))
    +  } {
    +    Seq(
    +      Row(18, 1))
    +  }
    +
    +  testPruningAndFiltering(
    +    projections = Seq('b, 'p),
    +    filter = 'c > "val_7" && 'b < 18 && 'p > 0,
    +    requiredColumns = Seq("b"),
    +    pushedFilters = Seq(GreaterThan("c", "val_7")),
    +    inconvertibleFilters = Nil,
    +    unhandledFilters = Seq('b < 18),
    +    partitioningFilters = Seq('p > 0)
    +  ) {
    +    Seq(
    +      Row(16, 1),
    +      Row(18, 1))
    +  } {
    +    Seq(
    +      Row(16, 1))
    +  }
    +
    +  testPruningAndFiltering(
    +    projections = Seq('b, 'p),
    +    filter = 'a % 2 === 0 && 'c > "val_7" && 'b < 18 && 'p > 0,
    --- End diff --
    
    1. Partitioning filters don't participate filter push-down, they are 
handled separately in `DataSourceStrategy`
    2. Catalyst filter `Expression`s that cannot be converted to data source 
`Filter`s are not pushed down (e.g. UDF and filters referencing multiple 
columns).
    3. Catalyst filter `Expression`s that can be converted to data source 
`Filter`s but cannot be handled by the underlying data source are not pushed 
down (e.g. returned from `BaseRelation.unhandledFilters()`).
    
    Adding the above comments to the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to