[ 
https://issues.apache.org/jira/browse/SPARK-23298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-23298.
----------------------------------
    Resolution: Incomplete

> distinct.count on Dataset/DataFrame yields non-deterministic results
> --------------------------------------------------------------------
>
>                 Key: SPARK-23298
>                 URL: https://issues.apache.org/jira/browse/SPARK-23298
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle, SQL, YARN
>    Affects Versions: 2.1.0, 2.2.0
>         Environment: Spark 2.2.0 or 2.1.0
> Java 1.8.0_144
> Yarn version:
> {code:java}
> Hadoop 2.6.0-cdh5.12.1
> Subversion http://github.com/cloudera/hadoop -r 
> 520d8b072e666e9f21d645ca6a5219fc37535a52
> Compiled by jenkins on 2017-08-24T16:43Z
> Compiled with protoc 2.5.0
> From source with checksum de51bf9693ab9426379a1cd28142cea0
> This command was run using 
> /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code}
>  
>  
>            Reporter: Mateusz Jukiewicz
>            Priority: Major
>              Labels: Correctness, CorrectnessBug, bulk-closed, correctness
>
> This is what happens (EDIT - managed to get a reproducible example):
> {code:java}
> /* Exemplary spark-shell starting command 
> /opt/spark/bin/spark-shell \
> --num-executors 269 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.kryoserializer.buffer.max=512m 
> // The spark.sql.shuffle.partitions is 2154 here, if that matters
> */
> val df = spark.range(10000000).withColumn("col1", (rand() * 
> 1000).cast("long")).withColumn("col2", (rand() * 
> 1000).cast("long")).drop("id")
> df.repartition(5240).write.parquet("/test.parquet")
> // Then, ideally in a new session
> val df = spark.read.parquet("/test.parquet")
> df.distinct.count
> // res1: Long = 1001256                                                       
>      
> df.distinct.count
> // res2: Long = 999955   {code}
> -The _text_dataset.out_ file is a dataset with one string per line. The 
> string has alphanumeric characters as well as colons and spaces. The line 
> length does not exceed 1200. I don't think that's important though, as the 
> issue appeared on various other datasets, I just tried to narrow it down to 
> the simplest possible case.- (the case is now fully reproducible with the 
> above code)
> The observations regarding the issue are as follows:
>  * I managed to reproduce it on both spark 2.2 and spark 2.1.
>  * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
>  * The issue is not reproducible on a single machine (e.g. laptop) in spark 
> local mode.
>  * It seems that once the correct count is computed, it is not possible to 
> reproduce the issue in the same spark session. In other words, I was able to 
> get 2-3 incorrect distinct.count results consecutively, but once it got 
> right, it always returned the correct value. I had to re-run spark-shell to 
> observe the problem again.
>  * The issue appears on both Dataset and DataFrame (i.e. using read.text or 
> read.textFile).
>  * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
>  * Not a single container has failed in those multiple invalid executions.
>  * YARN doesn't show any warnings or errors in those invalid executions.
>  * The execution plan determined for both valid and invalid executions was 
> always the same (it's shown in the _SQL_ tab of the UI).
>  * The number returned in the invalid executions was always greater than the 
> correct number (24 014 227).
>  * This occurs even though the input is already completely deduplicated (i.e. 
> _distinct.count_ shouldn't change anything).
>  * The input isn't replicated (i.e. there's only one copy of each file block 
> on the HDFS).
>  * The problem is probably not related to reading from HDFS. Spark was always 
> able to correctly read all input records (which was shown in the UI), and 
> that number got malformed after the exchange phase:
>  ** correct execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24014227 _(second stage)_
>  ** incorrect execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24020150 _(second stage)_
>  * The problem might be related with the internal way of Encoders hashing. 
> The reason might be:
>  ** in a simple `distinct.count` invocation, there are in total three 
> hash-related stages (called `HashAggregate`),
>  ** excerpt from scaladoc for `distinct` method says:
> {code:java}
>    * @note Equality checking is performed directly on the encoded 
> representation of the data
>    * and thus is not affected by a custom `equals` function defined on 
> `T`.{code}
>  * One of my suspicions was the number of partitions we're using (2154). This 
> is greater than 2000, which means that a different data structure (i.e. 
> _HighlyCompressedMapStatus_instead of _CompressedMapStatus_) will be used for 
> book-keeping during the shuffle. Unfortunately after decreasing the number 
> below this threshold the problem still occurs.
>  * It's easier to reproduce the issue with a large number of partitions.
>  * One of my another suspicions was that it's somehow related to the number 
> of blocks on the HDFS (974). I was able to reproduce the problem with both 
> less and more partitions than this value, so I think this is not the case.
>  * Final note: It looks like for some reason the data gets duplicated in the 
> process of data exchange during the shuffle (because shuffle read sees more 
> elements than shuffle write has written).
> Please let me know if you have any other questions.
> I couldn't find much about similar problems on the Web, the only thing I 
> found was on the spark mailing list where someone using PySpark has found 
> that one of his/her executors was hashing things differently than the other 
> one which caused a similar issue.
> I didn't include a reproducible example as this is just a long file with 
> strings and as this occurred on many different datasets, I doubt it's 
> data-related. If that's necessary though, please let me know and I will try 
> to prepare an example.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to