[ https://issues.apache.org/jira/browse/SPARK-8466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Armbrust updated SPARK-8466: ------------------------------------ Description: Input Data: a parquet file stored in hdfs://xxxx/data with two columns (lifeAverageBitrateKbps int, playtimems int) ================================= Scripts used in spark-shell: {code} val df = sqlContext.parquetFile("hdfs://xxxx/data") import org.apache.spark.sql.types._ val cols = df.schema.fields.map { f => val dataType = f.dataType match { case DoubleType | FloatType => DecimalType.Unlimited case t => t } df.col(f.name).cast(dataType).as(f.name) } df.select(cols: _*).registerTempTable("t") val query = """ |SELECT avg(cleanedplaytimems), | count(1) |FROM | (SELECT 0 key, | avg(lifeAverageBitrateKbps) avgbitrate | FROM anon_sdm2_ss | WHERE lifeAverageBitrateKbps > 0) t1, | (SELECT 0 key, | lifeAverageBitrateKbps, | if(playtimems > 0, playtimems, 0) cleanedplaytimems | FROM anon_sdm2_ss | WHERE lifeAverageBitrateKbps > 0) t2 |WHERE t1.key=t2.key | AND t2.lifeAverageBitrateKbps < 0.5 * t1.avgbitrate """.stripMargin sqlContext.sql(query).explain(true) {code} =========================== Output: {code} == Analyzed Logical Plan == Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1) AS _c1#112L] Filter ((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, DoubleType) < (0.5 * avgbitrate#108))) Join Inner, None Subquery t1 Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, LongType)) AS avgbitrate#108] Filter (lifeAverageBitrateKbps#105 > 0) Subquery anon_sdm2_ss Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106] Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None) Subquery t2 Project [0 AS key#109,lifeAverageBitrateKbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#106 > 0),playtimems#106,0) AS cleanedplaytimems#110] Filter (lifeAverageBitrateKbps#105 > 0) Subquery anon_sdm2_ss Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106] Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None) == Optimized Logical Plan == Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1) AS _c1#112L] Project [cleanedplaytimems#110] Join Inner, Some(((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, DoubleType) < (0.5 * avgbitrate#108)))) Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, LongType)) AS avgbitrate#108] Project [lifeaveragebitratekbps#27 AS lifeaveragebitratekbps#105] !Filter (lifeAverageBitrateKbps#105 > 0) Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None) Project [0 AS key#109,lifeaveragebitratekbps#27 AS lifeaveragebitratekbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#89 AS playtimems#106 > 0),playtimems#89 AS playtimems#106,0) AS cleanedplaytimems#110] !Filter (lifeAverageBitrateKbps#105 > 0) Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None) {code} Note: Filter is unresolved was: Input Data: a parquet file stored in hdfs://xxxx/data with two columns (lifeAverageBitrateKbps int, playtimems int) ================================= Scripts used in spark-shell: val df = sqlContext.parquetFile("hdfs://xxxx/data") import org.apache.spark.sql.types._ val cols = df.schema.fields.map { f => val dataType = f.dataType match { case DoubleType | FloatType => DecimalType.Unlimited case t => t } df.col(f.name).cast(dataType).as(f.name) } df.select(cols: _*).registerTempTable("t") val query = """ |SELECT avg(cleanedplaytimems), | count(1) |FROM | (SELECT 0 key, | avg(lifeAverageBitrateKbps) avgbitrate | FROM anon_sdm2_ss | WHERE lifeAverageBitrateKbps > 0) t1, | (SELECT 0 key, | lifeAverageBitrateKbps, | if(playtimems > 0, playtimems, 0) cleanedplaytimems | FROM anon_sdm2_ss | WHERE lifeAverageBitrateKbps > 0) t2 |WHERE t1.key=t2.key | AND t2.lifeAverageBitrateKbps < 0.5 * t1.avgbitrate """.stripMargin sqlContext.sql(query).explain(true) =========================== Output: ..... == Analyzed Logical Plan == Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1) AS _c1#112L] Filter ((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, DoubleType) < (0.5 * avgbitrate#108))) Join Inner, None Subquery t1 Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, LongType)) AS avgbitrate#108] Filter (lifeAverageBitrateKbps#105 > 0) Subquery anon_sdm2_ss Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106] Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None) Subquery t2 Project [0 AS key#109,lifeAverageBitrateKbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#106 > 0),playtimems#106,0) AS cleanedplaytimems#110] Filter (lifeAverageBitrateKbps#105 > 0) Subquery anon_sdm2_ss Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106] Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None) == Optimized Logical Plan == Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1) AS _c1#112L] Project [cleanedplaytimems#110] Join Inner, Some(((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, DoubleType) < (0.5 * avgbitrate#108)))) Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, LongType)) AS avgbitrate#108] Project [lifeaveragebitratekbps#27 AS lifeaveragebitratekbps#105] !Filter (lifeAverageBitrateKbps#105 > 0) Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None) Project [0 AS key#109,lifeaveragebitratekbps#27 AS lifeaveragebitratekbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#89 AS playtimems#106 > 0),playtimems#89 AS playtimems#106,0) AS cleanedplaytimems#110] !Filter (lifeAverageBitrateKbps#105 > 0) Relation[lifeaveragebitratekbps#27,playtimems#89] ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None) Note: Filter is unresolved > Bug in SQL Optimizer: Unresolved Attribute after pushing Filter below Project > ----------------------------------------------------------------------------- > > Key: SPARK-8466 > URL: https://issues.apache.org/jira/browse/SPARK-8466 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.3.1, 1.4.0 > Reporter: Kai Zeng > Priority: Critical > > Input Data: a parquet file stored in hdfs://xxxx/data with two columns > (lifeAverageBitrateKbps int, playtimems int) > ================================= > Scripts used in spark-shell: > {code} > val df = sqlContext.parquetFile("hdfs://xxxx/data") > import org.apache.spark.sql.types._ > val cols = df.schema.fields.map { f => > val dataType = f.dataType match { > case DoubleType | FloatType => DecimalType.Unlimited > case t => t > } > df.col(f.name).cast(dataType).as(f.name) > } > df.select(cols: _*).registerTempTable("t") > val query = > """ > |SELECT avg(cleanedplaytimems), > | count(1) > |FROM > | (SELECT 0 key, > | avg(lifeAverageBitrateKbps) avgbitrate > | FROM anon_sdm2_ss > | WHERE lifeAverageBitrateKbps > 0) t1, > | (SELECT 0 key, > | lifeAverageBitrateKbps, > | if(playtimems > 0, playtimems, 0) cleanedplaytimems > | FROM anon_sdm2_ss > | WHERE lifeAverageBitrateKbps > 0) t2 > |WHERE t1.key=t2.key > | AND t2.lifeAverageBitrateKbps < 0.5 * t1.avgbitrate > """.stripMargin > sqlContext.sql(query).explain(true) > {code} > =========================== > Output: > {code} > == Analyzed Logical Plan == > Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1) > AS _c1#112L] > Filter ((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, DoubleType) > < (0.5 * avgbitrate#108))) > Join Inner, None > Subquery t1 > Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, > LongType)) AS avgbitrate#108] > Filter (lifeAverageBitrateKbps#105 > 0) > Subquery anon_sdm2_ss > Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS > lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106] > Relation[lifeaveragebitratekbps#27,playtimems#89] > ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None) > Subquery t2 > Project [0 AS > key#109,lifeAverageBitrateKbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#106 > > 0),playtimems#106,0) AS cleanedplaytimems#110] > Filter (lifeAverageBitrateKbps#105 > 0) > Subquery anon_sdm2_ss > Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS > lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106] > Relation[lifeaveragebitratekbps#27,playtimems#89] > ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None) > == Optimized Logical Plan == > Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1) > AS _c1#112L] > Project [cleanedplaytimems#110] > Join Inner, Some(((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, > DoubleType) < (0.5 * avgbitrate#108)))) > Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, > LongType)) AS avgbitrate#108] > Project [lifeaveragebitratekbps#27 AS lifeaveragebitratekbps#105] > !Filter (lifeAverageBitrateKbps#105 > 0) > Relation[lifeaveragebitratekbps#27,playtimems#89] > ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None) > Project [0 AS key#109,lifeaveragebitratekbps#27 AS > lifeaveragebitratekbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#89 > AS playtimems#106 > 0),playtimems#89 AS playtimems#106,0) AS > cleanedplaytimems#110] > !Filter (lifeAverageBitrateKbps#105 > 0) > Relation[lifeaveragebitratekbps#27,playtimems#89] > ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None) > {code} > Note: > Filter is unresolved -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org