Utkarsh Sharma created SPARK-26974:
--------------------------------------

             Summary: Invalid data in grouped cached dataset, formed by joining 
a large cached dataset with a small dataset
                 Key: SPARK-26974
                 URL: https://issues.apache.org/jira/browse/SPARK-26974
             Project: Spark
          Issue Type: Bug
          Components: Java API, Spark Core, SQL
    Affects Versions: 2.2.0
            Reporter: Utkarsh Sharma


The initial datasets are derived from hive tables using the spark.table() 
functions.

Dataset descriptions:

*+Sales+* dataset (close to 10 billion rows) with the following columns (and 
sample rows) : 
||ItemId (bigint)||CustomerId (bigint)||qty_sold (bigint)||
|1|1|20|
|1|2|30|
|2|1|40|

 

+*Customer*+ Dataset (close to 50000 rows) with the following columns (and 
sample rows):
||CustomerId (bigint)||CustomerGrpNbr (smallint)||
|1|1|
|2|2|
|3|1|

 

I am doing the following steps:
 # Caching sales dataset with close to 10 billion rows.
 # Doing an inner join of 'sales' with 'customer' dataset
 
 # Doing group by on the resultant dataset, based on CustomerGrpNbr column to 
get sum(qty_sold) and stddev(qty_sold) vales in the customer groups.
 # Caching the resultant grouped dataset.
 # Doing a .count() on the grouped dataset.

The step 5 count is supposed to return only 20, because when you do a 
customer.select("CustomerGroupNbr").distinct().count you get 20 values. 
However, you get a value of around 65,000 in step 5.

Following are the commands I am running in spark-shell:
{code:java}
var sales = spark.table("sales_table")
var customer = spark.table(“customer_table”)
var finalDf = sales.join(customer, 
"CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), stddev("qty_sold"))
sales.cache()
finalDf.cache()
finalDf.count() // returns around 65k rows and the count keeps on varying each 
// run
customer.select("CustomerGrpNbr").distinct().count() //returns 20{code}
I have been able to replicate the same behavior using the java api as well. 
This anamolous behavior disappears however, when I remove the caching 
statements. I.e. if i run the following in spark-shell, it works as expected:
{code:java}
var sales = spark.table("sales_table")
var customer = spark.table(“customer_table”)
var finalDf = sales.join(customer, 
"CustomerId").groupBy("CustomerGrpNbr").agg(sum("qty_sold"), 
stddev("qty_sold")) 
finalDf.count() // returns 20 
customer.select("CustomerGrpNbr").distinct().count() //returns 20
{code}
The tables in hive from which the datasets are built do not change during this 
entire process. So why does the caching cause this problem?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to