Utkarsh Sharma created SPARK-26974: -------------------------------------- Summary: Invalid data in grouped cached dataset, formed by joining a large cached dataset with a small dataset Key: SPARK-26974 URL: https://issues.apache.org/jira/browse/SPARK-26974 Project: Spark Issue Type: Bug Components: Java API, Spark Core, SQL Affects Versions: 2.2.0 Reporter: Utkarsh Sharma
The initial datasets are derived from hive tables using the spark.table() functions. Dataset descriptions: *+Sales+* dataset (close to 10 billion rows) with the following columns (and sample rows) : ||ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)|| |1|1|20| |1|2|30| |2|1|40| +*Customer*+ Dataset (close to 50000 rows) with the following columns (and sample rows): ||CustomerId (bigint)||CustomerGrpNbr (smallint)|| |1|1| |2|2| |3|1| I am doing the following steps: # Caching sales dataset with close to 10 billion rows. # Doing an inner join of 'sales' with 'customer' dataset # Doing group by on the resultant dataset, based on CustomerGrpNbr column to get sum(qty_sold) and stddev(qty_sold) vales in the customer groups. # Caching the resultant grouped dataset. # Doing a .count() on the grouped dataset. The step 5 count is supposed to return only 20, because when you do a customer.select("CustomerGroupNbr").distinct().count you get 20 values. However, you get a value of around 65,000 in step 5. Following are the commands I am running in spark-shell: {code:java} var sales = spark.table("sales_table") var customer = spark.table(“customer_table”) var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold")) sales.cache() finalDf.cache() finalDf.count() // returns around 65k rows and the count keeps on varying each // run customer.select("CustomerGrpNbr").distinct().count() //returns 20{code} I have been able to replicate the same behavior using the java api as well. This anamolous behavior disappears however, when I remove the caching statements. I.e. if i run the following in spark-shell, it works as expected: {code:java} var sales = spark.table("sales_table") var customer = spark.table(“customer_table”) var finalDf = sales.join(customer, "CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold")) finalDf.count() // returns 20 customer.select("CustomerGrpNbr").distinct().count() //returns 20 {code} The tables in hive from which the datasets are built do not change during this entire process. So why does the caching cause this problem? -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org