[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18000 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117168737 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -538,6 +538,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // scalastyle:on nonascii } } + + test("SPARK-20364: Disable Parquet predicate pushdown for fields having dots in the names") { --- End diff -- Looks much better now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117168546 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -47,39 +49,47 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2} *data type is nullable. */ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext { --- End diff -- Sure, I just revert it back and made a simple test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117162950 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -47,39 +49,47 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2} *data type is nullable. */ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext { --- End diff -- can we just have a simple end-to-end test? The fix is actually very simple and seems not worth such complex tests to verify it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117162403 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { -val dataTypeOf = getFieldMap(schema) +val nameTypeMap = getFieldMap(schema) --- End diff -- nit: `nameToType` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117158402 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -490,6 +516,42 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } + test("SPARK-20364 Do not push down filters when column names have dots") { +implicit class StringToAttribute(str: String) { + // Implicits for attr, $ and symbol do not handle backticks. + def attribute: Attribute = UnresolvedAttribute.quotedString(str) --- End diff -- Yea, actually my initial version in my local included the change for `symbol` and` $` to match them to `Column`. It also looks making sense per https://github.com/apache/spark/pull/7969. I believe this is an internal API - https://github.com/apache/spark/blob/e9c91badce64731ffd3e53cbcd9f044a7593e6b8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala#L24 so I guess it would be fine even if it introduces a behaviour change. Nevertheless, I believe some guys don't like this change much and wanted to avoid such changes here for now (it is single place it needs anyway for now ... ). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117157965 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -490,6 +516,42 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } + test("SPARK-20364 Do not push down filters when column names have dots") { +implicit class StringToAttribute(str: String) { + // Implicits for attr, $ and symbol do not handle backticks. + def attribute: Attribute = UnresolvedAttribute.quotedString(str) --- End diff -- Shall we make $ to use`UnresolvedAttribute.quotedString`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117145159 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { -val dataTypeOf = getFieldMap(schema) +val nameTypeMap = getFieldMap(schema) + +// Parquet does not allow dots in the column name because dots are used as a column path --- End diff -- Not just for speed. Also for the number of codes needed to change. But I think it is ok for me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117143908 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { -val dataTypeOf = getFieldMap(schema) +val nameTypeMap = getFieldMap(schema) + +// Parquet does not allow dots in the column name because dots are used as a column path --- End diff -- Hm, I expect this is a non-critical path and not executed multiple times. Also, it does not look particularly faster to call, `Filter.references` -> `Filter.findReferences` -> `Filter.references` ... . Another downside (maybe nitpicking) is, this will introduce another small code path that returns `None` for filter creation failure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117143600 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { -val dataTypeOf = getFieldMap(schema) +val nameTypeMap = getFieldMap(schema) + +// Parquet does not allow dots in the column name because dots are used as a column path --- End diff -- Ok. Sounds making sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117140137 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { -val dataTypeOf = getFieldMap(schema) +val nameTypeMap = getFieldMap(schema) + +// Parquet does not allow dots in the column name because dots are used as a column path --- End diff -- Yes, it is. However, we don't already log pushed filters failed to create, e.g., `In` AFAIK. Probably, we should log in those cases across all the sources. If you don't strongly feel about this, I would like to not log here for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117132718 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { -val dataTypeOf = getFieldMap(schema) +val nameTypeMap = getFieldMap(schema) + +// Parquet does not allow dots in the column name because dots are used as a column path --- End diff -- We should log something for users because it might not be straightforward for users to know predicate pushdown is disabled for dot-columns. This is bad for performance, it seems to me that it's better to let users know what's happened. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117132177 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { -val dataTypeOf = getFieldMap(schema) +val nameTypeMap = getFieldMap(schema) + +// Parquet does not allow dots in the column name because dots are used as a column path --- End diff -- Instead of checking dots for each predicate recursively in `ParquetFilters`, we can check `Filter.references` of the predicate at top level in `ParquetFileFormat`, and skip `ParquetFilters.createFilter` at all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117061432 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -47,39 +49,45 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2} *data type is nullable. */ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext { + + private def checkWithSelectedFilters + (df: DataFrame, predicate: Predicate) + (checker: (DataFrame, Seq[Filter]) => Unit): Unit = { +val output = predicate.collect { case a: Attribute => a }.distinct + +val filtered = df + .select(output.map(e => Column(e)): _*) + .where(Column(predicate)) + +var maybeRelation: Option[HadoopFsRelation] = None +val maybeAnalyzedPredicate = filtered.queryExecution.optimizedPlan.collect { + case PhysicalOperation(_, filters, LogicalRelation(relation: HadoopFsRelation, _, _)) => +maybeRelation = Some(relation) +filters +}.flatten.reduceLeftOption(_ && _) +assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") + +val (_, selectedFilters, _) = + DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) +assert(selectedFilters.nonEmpty, "No filter is pushed down") +checker(filtered, selectedFilters) + } + private def checkFilterPredicate( df: DataFrame, predicate: Predicate, filterClass: Class[_ <: FilterPredicate], checker: (DataFrame, Seq[Row]) => Unit, expected: Seq[Row]): Unit = { -val output = predicate.collect { case a: Attribute => a }.distinct +checkWithSelectedFilters(df, predicate) { case (filtered, selectedFilters) => + selectedFilters.foreach { pred => +val maybeFilter = ParquetFilters.createFilter(df.schema, pred) +assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") + } -withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { -val query = df - .select(output.map(e => Column(e)): _*) - .where(Column(predicate)) - -var maybeRelation: Option[HadoopFsRelation] = None -val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(relation: HadoopFsRelation, _, _)) => -maybeRelation = Some(relation) -filters -}.flatten.reduceLeftOption(_ && _) -assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") - -val (_, selectedFilters, _) = - DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) -assert(selectedFilters.nonEmpty, "No filter is pushed down") - -selectedFilters.foreach { pred => - val maybeFilter = ParquetFilters.createFilter(df.schema, pred) - assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") - // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) - maybeFilter.exists(_.getClass === filterClass) --- End diff -- Thanks ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117061359 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { -val dataTypeOf = getFieldMap(schema) +val nameTypeMap = getFieldMap(schema) + +// Parquet does not allow dots in the column name because dots are used as a column path +// delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates +// with missing columns. The incorrect results could be got from Parquet when we push down +// filters for the column having dots in the names. Thus, we do not push down such filters. +// See SPARK-20364. +def canMakeFilterOn(name: String): Boolean = nameTypeMap.contains(name) && !name.contains(".") --- End diff -- I think we should disallow when initially loading or writing out if it is still allowed in any way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r11703 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -108,6 +116,15 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate(df, predicate, filterClass, checkBinaryAnswer _, expected) } + private def checkNoFilterPredicate(predicate: Predicate)(implicit df: DataFrame): Unit = { +checkWithSelectedFilters(df, predicate) { case (_, selectedFilters) => + selectedFilters.foreach { pred => +val maybeFilter = ParquetFilters.createFilter(df.schema, pred) +assert(maybeFilter.isEmpty, s"Could generate filter predicate for $pred") --- End diff -- We should also check correctness of the results. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117030781 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -108,6 +116,15 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex checkFilterPredicate(df, predicate, filterClass, checkBinaryAnswer _, expected) } + private def checkNoFilterPredicate(predicate: Predicate)(implicit df: DataFrame): Unit = { +checkWithSelectedFilters(df, predicate) { case (_, selectedFilters) => + selectedFilters.foreach { pred => +val maybeFilter = ParquetFilters.createFilter(df.schema, pred) +assert(maybeFilter.isEmpty, s"Could generate filter predicate for $pred") --- End diff -- "Shouldn't generate filter predicate for $pred"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117030310 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala --- @@ -47,39 +49,45 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2} *data type is nullable. */ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext { + + private def checkWithSelectedFilters + (df: DataFrame, predicate: Predicate) + (checker: (DataFrame, Seq[Filter]) => Unit): Unit = { +val output = predicate.collect { case a: Attribute => a }.distinct + +val filtered = df + .select(output.map(e => Column(e)): _*) + .where(Column(predicate)) + +var maybeRelation: Option[HadoopFsRelation] = None +val maybeAnalyzedPredicate = filtered.queryExecution.optimizedPlan.collect { + case PhysicalOperation(_, filters, LogicalRelation(relation: HadoopFsRelation, _, _)) => +maybeRelation = Some(relation) +filters +}.flatten.reduceLeftOption(_ && _) +assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") + +val (_, selectedFilters, _) = + DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) +assert(selectedFilters.nonEmpty, "No filter is pushed down") +checker(filtered, selectedFilters) + } + private def checkFilterPredicate( df: DataFrame, predicate: Predicate, filterClass: Class[_ <: FilterPredicate], checker: (DataFrame, Seq[Row]) => Unit, expected: Seq[Row]): Unit = { -val output = predicate.collect { case a: Attribute => a }.distinct +checkWithSelectedFilters(df, predicate) { case (filtered, selectedFilters) => + selectedFilters.foreach { pred => +val maybeFilter = ParquetFilters.createFilter(df.schema, pred) +assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") + } -withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { -val query = df - .select(output.map(e => Column(e)): _*) - .where(Column(predicate)) - -var maybeRelation: Option[HadoopFsRelation] = None -val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(relation: HadoopFsRelation, _, _)) => -maybeRelation = Some(relation) -filters -}.flatten.reduceLeftOption(_ && _) -assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") - -val (_, selectedFilters, _) = - DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) -assert(selectedFilters.nonEmpty, "No filter is pushed down") - -selectedFilters.foreach { pred => - val maybeFilter = ParquetFilters.createFilter(df.schema, pred) - assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") - // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) - maybeFilter.exists(_.getClass === filterClass) --- End diff -- Don't we need the check of `filterClass`? Why remove it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r117026406 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { -val dataTypeOf = getFieldMap(schema) +val nameTypeMap = getFieldMap(schema) + +// Parquet does not allow dots in the column name because dots are used as a column path +// delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates +// with missing columns. The incorrect results could be got from Parquet when we push down +// filters for the column having dots in the names. Thus, we do not push down such filters. +// See SPARK-20364. +def canMakeFilterOn(name: String): Boolean = nameTypeMap.contains(name) && !name.contains(".") --- End diff -- Do we need to consider other special characters, e.g., those in apache/parquet-mr#361? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r116964092 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { -val dataTypeOf = getFieldMap(schema) +val nameTypeMap = getFieldMap(schema) + +// Parquet does not allow dots in the column name because dots are used as a column path +// delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates +// with missing columns. The incorrect results could be got from Parquet when we push down +// filters for the column having dots in the names. Thus, we do not push down such filters. --- End diff -- In any way, I believe we should disable because it appears the pushed Parquet filter indicates another column. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r116950919 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { -val dataTypeOf = getFieldMap(schema) +val nameTypeMap = getFieldMap(schema) + +// Parquet does not allow dots in the column name because dots are used as a column path +// delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates +// with missing columns. The incorrect results could be got from Parquet when we push down +// filters for the column having dots in the names. Thus, we do not push down such filters. --- End diff -- Yes, but the problem is, it (almost) always evaluates it with NULL when the columns have dots in the names because column paths become nested (`a.b` not `` `a.b` ``) in the Parquet predicate filter. You are right for `IsNull`. I pointed out this in https://github.com/apache/spark/pull/17680#discussion_r112285883 as they (almost) always evaluate it to `true` in Parquet-side but it is filtered in Spark-side. So, for input/output, it is not an issue in this case but I believe we should disable this for this case too. I think this example explains the case ```scala val dfs = Seq( Seq(Some(1), None).toDF("col.dots"), Seq(Some(1L), None).toDF("col.dots"), Seq(Some(1.0F), None).toDF("col.dots"), Seq(Some(1.0D), None).toDF("col.dots"), Seq(true, false).toDF("col.dots"), Seq("apple", null).toDF("col.dots"), Seq("apple", null).toDF("col.dots") ) val predicates = Seq( "`col.dots` > 0", "`col.dots` >= 1L", "`col.dots` < 2.0", "`col.dots` <= 1.0D", "`col.dots` == true", "`col.dots` IS NOT NULL", "`col.dots` IS NULL" ) dfs.zip(predicates).zipWithIndex.foreach { case ((df, predicate), i) => val path = s"/tmp/abcd$i" df.write.mode("overwrite").parquet(path) spark.read.parquet(path).where(predicate).show() } ``` ``` ++ |col.dots| ++ ++ ++ |col.dots| ++ ++ ++ |col.dots| ++ ++ ++ |col.dots| ++ ++ ++ |col.dots| ++ ++ ++ |col.dots| ++ ++ ++ |col.dots| ++ |null| ++ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r116946732 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { -val dataTypeOf = getFieldMap(schema) +val nameTypeMap = getFieldMap(schema) + +// Parquet does not allow dots in the column name because dots are used as a column path +// delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates +// with missing columns. The incorrect results could be got from Parquet when we push down +// filters for the column having dots in the names. Thus, we do not push down such filters. --- End diff -- It seems to me that a missing column is treated like a NULL value. The results will be changed only for some predicates, e.g, `IsNull` and `IsNotNull`. For other predicates, can we still push down them? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r116879339 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -168,6 +168,11 @@ private[parquet] object ParquetFilters { def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { val dataTypeOf = getFieldMap(schema) +// Note that, we do not push down filters for columns having dots in the names. In Parquet +// 1.8.2, currently, column APIs in Parquet's `FilterApi` only allow dot-separated names +// so here we simply avoid this case. See SPARK-20364. --- End diff -- Sure, it looks much nicer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user HyukjinKwon closed the pull request at: https://github.com/apache/spark/pull/18000 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
GitHub user HyukjinKwon reopened a pull request: https://github.com/apache/spark/pull/18000 [SPARK-20364][SQL] Disable Parquet predicate pushdown for fields having dots in the names ## What changes were proposed in this pull request? This is an alternative workaround by simply avoiding the predicate pushdown for columns having dots in the names. This is an approach different with https://github.com/apache/spark/pull/17680. The downside of this PR is, literally it does not push down filters on the column having dots in Parquet files at all (both no record level and no rowgroup level) whereas the downside of the approach in that PR, it does not use the Parquet's API properly but in a hacky way to support this case. I assume we prefer a safe way here by using the Parquet API properly but this does close that PR as we are basically just avoiding here. This way looks a simple workaround and probably it is fine given the problem looks arguably rather corner cases (although it might end up with reading whole row groups under the hood but either looks not the best). Currently, if there are dots in the column name, predicate pushdown seems being failed in Parquet. **With dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` ++ |col.dots| ++ ++ ``` **Without dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("coldots").write.parquet(path) spark.read.parquet(path).where("`coldots` IS NOT NULL").show() ``` ``` +---+ |coldots| +---+ | 1| +---+ ``` **After** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` ++ |col.dots| ++ | 1| ++ ``` ## How was this patch tested? Unit tests added in `ParquetFilterSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark SPARK-20364-workaround Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18000.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18000 commit e94560e07dc578154e222896915fb29e98869480 Author: hyukjinkwon Date: 2017-05-16T09:39:18Z Disable Parquet predicate pushdown for fields having dots in the names --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r116879168 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -168,6 +168,11 @@ private[parquet] object ParquetFilters { def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { val dataTypeOf = getFieldMap(schema) +// Note that, we do not push down filters for columns having dots in the names. In Parquet +// 1.8.2, currently, column APIs in Parquet's `FilterApi` only allow dot-separated names +// so here we simply avoid this case. See SPARK-20364. +def canMakeFilterOn(name: String): Boolean = dataTypeOf.contains(name) && !name.contains(".") --- End diff -- (this is a nested function) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r116805464 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -168,6 +168,11 @@ private[parquet] object ParquetFilters { def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { val dataTypeOf = getFieldMap(schema) +// Note that, we do not push down filters for columns having dots in the names. In Parquet +// 1.8.2, currently, column APIs in Parquet's `FilterApi` only allow dot-separated names +// so here we simply avoid this case. See SPARK-20364. +def canMakeFilterOn(name: String): Boolean = dataTypeOf.contains(name) && !name.contains(".") --- End diff -- `private` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r116805363 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -168,6 +168,11 @@ private[parquet] object ParquetFilters { def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { val dataTypeOf = getFieldMap(schema) +// Note that, we do not push down filters for columns having dots in the names. In Parquet +// 1.8.2, currently, column APIs in Parquet's `FilterApi` only allow dot-separated names +// so here we simply avoid this case. See SPARK-20364. --- End diff -- Parquet does not allow . in the column name because it is used as a column path delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates with missing columns. Since it can generate the incorrect results, we block the filter pushdown when the filters have the column having dots in the names. See SPARK-20364. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r116801592 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -168,6 +168,11 @@ private[parquet] object ParquetFilters { def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { val dataTypeOf = getFieldMap(schema) --- End diff -- `dataTypeOf` -> `nameTypeMap` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18000#discussion_r116800201 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -168,6 +168,11 @@ private[parquet] object ParquetFilters { def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { val dataTypeOf = getFieldMap(schema) +// Note that, we do not push down filters for columns having dots in the names. In Parquet +// 1.8.2, currently, column APIs in Parquet's `FilterApi` only allow dot-separated names +// so here we simply avoid this case. See SPARK-20364. --- End diff -- Added the link to PARQUET-389 in the JIRA SPARK-20364 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18000: [SPARK-20364][SQL] Disable Parquet predicate push...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/18000 [SPARK-20364][SQL] Disable Parquet predicate pushdown for fields having dots in the names ## What changes were proposed in this pull request? This is an alternative workaround by simply avoiding the predicate pushdown for columns having dots in the names. This is an approach different with https://github.com/apache/spark/pull/17680. The downside of this PR is, literally it does not push down filters on the column having dots in Parquet files at all (both no record level and no rowgroup level) whereas the downside of the approach in that PR, it does not use the Parquet's API properly but in a hacky way to support this case. I assume we prefer a safe way here by using the Parquet API properly but this does close that PR as we are basically just avoiding here. This way looks a simple workaround and probably it is fine given the problem looks arguably rather corner cases (although it might end up with reading whole row groups under the hood but either looks not the best). Currently, if there are dots in the column name, predicate pushdown seems being failed in Parquet. **With dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` ++ |col.dots| ++ ++ ``` **Without dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("coldots").write.parquet(path) spark.read.parquet(path).where("`coldots` IS NOT NULL").show() ``` ``` +---+ |coldots| +---+ | 1| +---+ ``` **After** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` ++ |col.dots| ++ | 1| ++ ``` ## How was this patch tested? Unit tests added in `ParquetFilterSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark SPARK-20364-workaround Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18000.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18000 commit e94560e07dc578154e222896915fb29e98869480 Author: hyukjinkwon Date: 2017-05-16T09:39:18Z Disable Parquet predicate pushdown for fields having dots in the names --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org