[jira] [Commented] (SPARK-23298) distinct.count on Dataset/DataFrame yields non-deterministic results

2018-08-30 Thread Janne Jaanila (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16597350#comment-16597350
 ] 

Janne Jaanila commented on SPARK-23298:
---

[~mjukiewicz] You are right. After taking a closer look, I found out the issue 
in our code was caused by a dropDuplicates call put in wrong place. Thank you 
for clear explanation!

> distinct.count on Dataset/DataFrame yields non-deterministic results
> 
>
> Key: SPARK-23298
> URL: https://issues.apache.org/jira/browse/SPARK-23298
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, SQL, YARN
>Affects Versions: 2.1.0, 2.2.0
> Environment: Spark 2.2.0 or 2.1.0
> Java 1.8.0_144
> Yarn version:
> {code:java}
> Hadoop 2.6.0-cdh5.12.1
> Subversion http://github.com/cloudera/hadoop -r 
> 520d8b072e666e9f21d645ca6a5219fc37535a52
> Compiled by jenkins on 2017-08-24T16:43Z
> Compiled with protoc 2.5.0
> From source with checksum de51bf9693ab9426379a1cd28142cea0
> This command was run using 
> /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code}
>  
>  
>Reporter: Mateusz Jukiewicz
>Priority: Major
>  Labels: Correctness, CorrectnessBug, correctness
>
> This is what happens (EDIT - managed to get a reproducible example):
> {code:java}
> /* Exemplary spark-shell starting command 
> /opt/spark/bin/spark-shell \
> --num-executors 269 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.kryoserializer.buffer.max=512m 
> // The spark.sql.shuffle.partitions is 2154 here, if that matters
> */
> val df = spark.range(1000).withColumn("col1", (rand() * 
> 1000).cast("long")).withColumn("col2", (rand() * 
> 1000).cast("long")).drop("id")
> df.repartition(5240).write.parquet("/test.parquet")
> // Then, ideally in a new session
> val df = spark.read.parquet("/test.parquet")
> df.distinct.count
> // res1: Long = 1001256                                                       
>      
> df.distinct.count
> // res2: Long = 55   {code}
> -The _text_dataset.out_ file is a dataset with one string per line. The 
> string has alphanumeric characters as well as colons and spaces. The line 
> length does not exceed 1200. I don't think that's important though, as the 
> issue appeared on various other datasets, I just tried to narrow it down to 
> the simplest possible case.- (the case is now fully reproducible with the 
> above code)
> The observations regarding the issue are as follows:
>  * I managed to reproduce it on both spark 2.2 and spark 2.1.
>  * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
>  * The issue is not reproducible on a single machine (e.g. laptop) in spark 
> local mode.
>  * It seems that once the correct count is computed, it is not possible to 
> reproduce the issue in the same spark session. In other words, I was able to 
> get 2-3 incorrect distinct.count results consecutively, but once it got 
> right, it always returned the correct value. I had to re-run spark-shell to 
> observe the problem again.
>  * The issue appears on both Dataset and DataFrame (i.e. using read.text or 
> read.textFile).
>  * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
>  * Not a single container has failed in those multiple invalid executions.
>  * YARN doesn't show any warnings or errors in those invalid executions.
>  * The execution plan determined for both valid and invalid executions was 
> always the same (it's shown in the _SQL_ tab of the UI).
>  * The number returned in the invalid executions was always greater than the 
> correct number (24 014 227).
>  * This occurs even though the input is already completely deduplicated (i.e. 
> _distinct.count_ shouldn't change anything).
>  * The input isn't replicated (i.e. there's only one copy of each file block 
> on the HDFS).
>  * The problem is probably not related to reading from HDFS. Spark was always 
> able to correctly read all input records (which was shown in the UI), and 
> that number got malformed after the exchange phase:
>  ** correct execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24014227 _(second stage)_
>  ** incorrect execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24020150 _(second stage)_
>  * The problem might be related with the internal way of Encoders hashing. 
> The reason might be:
>  ** in a simple `distinct.count` invocation, there are in total three 
> hash-related stages (called `HashAggregate`),
>  ** excerpt from scaladoc for `distinct` method says:
> {code:java}
>* @note Equality checking is performed directly on the 

[jira] [Commented] (SPARK-23298) distinct.count on Dataset/DataFrame yields non-deterministic results

2018-08-29 Thread Mateusz Jukiewicz (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596366#comment-16596366
 ] 

Mateusz Jukiewicz commented on SPARK-23298:
---

[~jansi]

I can see two issues in your code.
 # If you want to fully reproduce my example, please save the generated 
dataframe first and then read it from disk. With working on it directly, rand() 
generates different numbers every time (which is expected) which might yield 
non-deterministic results.
 # I believe dropDuplicates() works in a non-deterministic fashion which is 
expected. You will end up in unique values in col1, but you don't know which 
row will remain, therefore, different values in col2 might be there in every 
run (meaning different numbers of distinct values), which can cause the effect 
you're observing.

Example on exemplary dataframe:
{code:java}
df
--
c1, c2
--
1 , 2
1 , 3
1 , 3
2 , 2
2 , 3

Run1:
--
df.dropDuplicates("c1") >
--
c1, c2
--
1 , 2
2 , 2

.dropDuplicates("c2") >
--
c1, c2
--
1 , 2
.count = 1

Run2:
--
df.dropDuplicates("c1") >
--
c1, c2
--
1 , 2
2 , 3

.dropDuplicates("c2") >
--
c1, c2
--
1 , 2
2 , 3
.count = 2{code}
As far as I know, behaviour like that is totally expected.

> distinct.count on Dataset/DataFrame yields non-deterministic results
> 
>
> Key: SPARK-23298
> URL: https://issues.apache.org/jira/browse/SPARK-23298
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, SQL, YARN
>Affects Versions: 2.1.0, 2.2.0
> Environment: Spark 2.2.0 or 2.1.0
> Java 1.8.0_144
> Yarn version:
> {code:java}
> Hadoop 2.6.0-cdh5.12.1
> Subversion http://github.com/cloudera/hadoop -r 
> 520d8b072e666e9f21d645ca6a5219fc37535a52
> Compiled by jenkins on 2017-08-24T16:43Z
> Compiled with protoc 2.5.0
> From source with checksum de51bf9693ab9426379a1cd28142cea0
> This command was run using 
> /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code}
>  
>  
>Reporter: Mateusz Jukiewicz
>Priority: Major
>  Labels: Correctness, CorrectnessBug, correctness
>
> This is what happens (EDIT - managed to get a reproducible example):
> {code:java}
> /* Exemplary spark-shell starting command 
> /opt/spark/bin/spark-shell \
> --num-executors 269 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.kryoserializer.buffer.max=512m 
> // The spark.sql.shuffle.partitions is 2154 here, if that matters
> */
> val df = spark.range(1000).withColumn("col1", (rand() * 
> 1000).cast("long")).withColumn("col2", (rand() * 
> 1000).cast("long")).drop("id")
> df.repartition(5240).write.parquet("/test.parquet")
> // Then, ideally in a new session
> val df = spark.read.parquet("/test.parquet")
> df.distinct.count
> // res1: Long = 1001256                                                       
>      
> df.distinct.count
> // res2: Long = 55   {code}
> -The _text_dataset.out_ file is a dataset with one string per line. The 
> string has alphanumeric characters as well as colons and spaces. The line 
> length does not exceed 1200. I don't think that's important though, as the 
> issue appeared on various other datasets, I just tried to narrow it down to 
> the simplest possible case.- (the case is now fully reproducible with the 
> above code)
> The observations regarding the issue are as follows:
>  * I managed to reproduce it on both spark 2.2 and spark 2.1.
>  * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
>  * The issue is not reproducible on a single machine (e.g. laptop) in spark 
> local mode.
>  * It seems that once the correct count is computed, it is not possible to 
> reproduce the issue in the same spark session. In other words, I was able to 
> get 2-3 incorrect distinct.count results consecutively, but once it got 
> right, it always returned the correct value. I had to re-run spark-shell to 
> observe the problem again.
>  * The issue appears on both Dataset and DataFrame (i.e. using read.text or 
> read.textFile).
>  * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
>  * Not a single container has failed in those multiple invalid executions.
>  * YARN doesn't show any warnings or errors in those invalid executions.
>  * The execution plan determined for both valid and invalid executions was 
> always the same (it's shown in the _SQL_ tab of the UI).
>  * The number returned in the invalid executions was always greater than the 
> correct number (24 014 227).
>  * This occurs even though the input is already completely deduplicated (i.e. 
> _distinct.count_ shouldn't change anything).
>  * The input isn't replicated (i.e. there's only one copy of each file block 
> on the HDFS).
>  * The proble

[jira] [Commented] (SPARK-23298) distinct.count on Dataset/DataFrame yields non-deterministic results

2018-08-29 Thread Janne Jaanila (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596324#comment-16596324
 ] 

Janne Jaanila commented on SPARK-23298:
---

I have faced a similar problem with multiple dropDuplicates() calls. I was able 
to reproduce it with Masteusz's dataframe in spark-shell:

 
{code:java}
./spark-shell --master yarn --deploy-mode client

scala> val df = spark.range(1000).withColumn("col1", (rand() * 
1000).cast("long")).withColumn("col2", (rand() * 1000).cast("long")).drop("id")
df: org.apache.spark.sql.DataFrame = [col1: bigint, col2: bigint]
scala> df.dropDuplicates("col1").dropDuplicates("col2").count()
res77: Long = 639 
scala> df.dropDuplicates("col1").dropDuplicates("col2").count()
res78: Long = 638
scala> df.dropDuplicates("col1").dropDuplicates("col2").count()
res79: Long = 618
scala> df.dropDuplicates("col1").dropDuplicates("col2").count()
res80: Long = 635
{code}
 

Count with a single dropDuplicates call works as expected.

I'm using Spark 2.3.1. We recently upgraded from 2.0.2 because of this issue.

> distinct.count on Dataset/DataFrame yields non-deterministic results
> 
>
> Key: SPARK-23298
> URL: https://issues.apache.org/jira/browse/SPARK-23298
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, SQL, YARN
>Affects Versions: 2.1.0, 2.2.0
> Environment: Spark 2.2.0 or 2.1.0
> Java 1.8.0_144
> Yarn version:
> {code:java}
> Hadoop 2.6.0-cdh5.12.1
> Subversion http://github.com/cloudera/hadoop -r 
> 520d8b072e666e9f21d645ca6a5219fc37535a52
> Compiled by jenkins on 2017-08-24T16:43Z
> Compiled with protoc 2.5.0
> From source with checksum de51bf9693ab9426379a1cd28142cea0
> This command was run using 
> /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code}
>  
>  
>Reporter: Mateusz Jukiewicz
>Priority: Major
>  Labels: Correctness, CorrectnessBug, correctness
>
> This is what happens (EDIT - managed to get a reproducible example):
> {code:java}
> /* Exemplary spark-shell starting command 
> /opt/spark/bin/spark-shell \
> --num-executors 269 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.kryoserializer.buffer.max=512m 
> // The spark.sql.shuffle.partitions is 2154 here, if that matters
> */
> val df = spark.range(1000).withColumn("col1", (rand() * 
> 1000).cast("long")).withColumn("col2", (rand() * 
> 1000).cast("long")).drop("id")
> df.repartition(5240).write.parquet("/test.parquet")
> // Then, ideally in a new session
> val df = spark.read.parquet("/test.parquet")
> df.distinct.count
> // res1: Long = 1001256                                                       
>      
> df.distinct.count
> // res2: Long = 55   {code}
> -The _text_dataset.out_ file is a dataset with one string per line. The 
> string has alphanumeric characters as well as colons and spaces. The line 
> length does not exceed 1200. I don't think that's important though, as the 
> issue appeared on various other datasets, I just tried to narrow it down to 
> the simplest possible case.- (the case is now fully reproducible with the 
> above code)
> The observations regarding the issue are as follows:
>  * I managed to reproduce it on both spark 2.2 and spark 2.1.
>  * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
>  * The issue is not reproducible on a single machine (e.g. laptop) in spark 
> local mode.
>  * It seems that once the correct count is computed, it is not possible to 
> reproduce the issue in the same spark session. In other words, I was able to 
> get 2-3 incorrect distinct.count results consecutively, but once it got 
> right, it always returned the correct value. I had to re-run spark-shell to 
> observe the problem again.
>  * The issue appears on both Dataset and DataFrame (i.e. using read.text or 
> read.textFile).
>  * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
>  * Not a single container has failed in those multiple invalid executions.
>  * YARN doesn't show any warnings or errors in those invalid executions.
>  * The execution plan determined for both valid and invalid executions was 
> always the same (it's shown in the _SQL_ tab of the UI).
>  * The number returned in the invalid executions was always greater than the 
> correct number (24 014 227).
>  * This occurs even though the input is already completely deduplicated (i.e. 
> _distinct.count_ shouldn't change anything).
>  * The input isn't replicated (i.e. there's only one copy of each file block 
> on the HDFS).
>  * The problem is probably not related to reading from HDFS. Spark was always 
> able to correctly read all input records (which was shown in the UI), and 
> that number got malformed after the exchange phase:
>  ** correct exe

[jira] [Commented] (SPARK-23298) distinct.count on Dataset/DataFrame yields non-deterministic results

2018-08-12 Thread Mateusz Jukiewicz (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577741#comment-16577741
 ] 

Mateusz Jukiewicz commented on SPARK-23298:
---

[~tgraves]

I edited the issue description and added a reproducible example which you can 
try out. Please keep in mind it might take several spark session of "distinct 
counting" to observe.

On the other hand, I tried and cannot reproduce the issue on Spark 2.3.1. 
Therefore the aforementioned SPARK-23207 could have fixed this one as well. 
Obviously, it would probably be the best if you managed to reproduce on Spark 
2.2 and confirm SPARK-23207 fixes this one as well. But if you guys don't have 
time for that, I'm fine with closing this one right away.

> distinct.count on Dataset/DataFrame yields non-deterministic results
> 
>
> Key: SPARK-23298
> URL: https://issues.apache.org/jira/browse/SPARK-23298
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, SQL, YARN
>Affects Versions: 2.1.0, 2.2.0
> Environment: Spark 2.2.0 or 2.1.0
> Java 1.8.0_144
> Yarn version:
> {code:java}
> Hadoop 2.6.0-cdh5.12.1
> Subversion http://github.com/cloudera/hadoop -r 
> 520d8b072e666e9f21d645ca6a5219fc37535a52
> Compiled by jenkins on 2017-08-24T16:43Z
> Compiled with protoc 2.5.0
> From source with checksum de51bf9693ab9426379a1cd28142cea0
> This command was run using 
> /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code}
>  
>  
>Reporter: Mateusz Jukiewicz
>Priority: Major
>  Labels: Correctness, CorrectnessBug, correctness
>
> This is what happens (EDIT - managed to get a reproducible example):
> {code:java}
> /* Exemplary spark-shell starting command 
> /opt/spark/bin/spark-shell \
> --num-executors 269 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.kryoserializer.buffer.max=512m 
> // The spark.sql.shuffle.partitions is 2154 here, if that matters
> */
> val df = spark.range(1000).withColumn("col1", (rand() * 
> 1000).cast("long")).withColumn("col2", (rand() * 
> 1000).cast("long")).drop("id")
> df.repartition(5240).write.parquet("/test.parquet")
> // Then, ideally in a new session
> val df = spark.read.parquet("/test.parquet")
> df.distinct.count
> // res1: Long = 1001256                                                       
>      
> df.distinct.count
> // res2: Long = 55   {code}
> -The _text_dataset.out_ file is a dataset with one string per line. The 
> string has alphanumeric characters as well as colons and spaces. The line 
> length does not exceed 1200. I don't think that's important though, as the 
> issue appeared on various other datasets, I just tried to narrow it down to 
> the simplest possible case.- (the case is now fully reproducible with the 
> above code)
> The observations regarding the issue are as follows:
>  * I managed to reproduce it on both spark 2.2 and spark 2.1.
>  * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
>  * The issue is not reproducible on a single machine (e.g. laptop) in spark 
> local mode.
>  * It seems that once the correct count is computed, it is not possible to 
> reproduce the issue in the same spark session. In other words, I was able to 
> get 2-3 incorrect distinct.count results consecutively, but once it got 
> right, it always returned the correct value. I had to re-run spark-shell to 
> observe the problem again.
>  * The issue appears on both Dataset and DataFrame (i.e. using read.text or 
> read.textFile).
>  * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
>  * Not a single container has failed in those multiple invalid executions.
>  * YARN doesn't show any warnings or errors in those invalid executions.
>  * The execution plan determined for both valid and invalid executions was 
> always the same (it's shown in the _SQL_ tab of the UI).
>  * The number returned in the invalid executions was always greater than the 
> correct number (24 014 227).
>  * This occurs even though the input is already completely deduplicated (i.e. 
> _distinct.count_ shouldn't change anything).
>  * The input isn't replicated (i.e. there's only one copy of each file block 
> on the HDFS).
>  * The problem is probably not related to reading from HDFS. Spark was always 
> able to correctly read all input records (which was shown in the UI), and 
> that number got malformed after the exchange phase:
>  ** correct execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24014227 _(second stage)_
>  ** incorrect execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read:

[jira] [Commented] (SPARK-23298) distinct.count on Dataset/DataFrame yields non-deterministic results

2018-08-12 Thread Mateusz Jukiewicz (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577734#comment-16577734
 ] 

Mateusz Jukiewicz commented on SPARK-23298:
---

[~tgraves]

I have not yet tried to reproduce on Spark 2.3, but will do and will let you 
know if it helped.

I'm not doing any explicit repartitions, but the `distinct` operation does 
trigger a shuffle and will send the data through the wire. Additionally, if the 
number of partitions in source dataframe is different than 
`spark.sql.shuffle.partitions`, then the number of partitions will change as 
well. Not sure if any of these can be called "repartitioning", but the data is 
exchanged between the nodes.

> distinct.count on Dataset/DataFrame yields non-deterministic results
> 
>
> Key: SPARK-23298
> URL: https://issues.apache.org/jira/browse/SPARK-23298
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, SQL, YARN
>Affects Versions: 2.1.0, 2.2.0
> Environment: Spark 2.2.0 or 2.1.0
> Java 1.8.0_144
> Yarn version:
> {code:java}
> Hadoop 2.6.0-cdh5.12.1
> Subversion http://github.com/cloudera/hadoop -r 
> 520d8b072e666e9f21d645ca6a5219fc37535a52
> Compiled by jenkins on 2017-08-24T16:43Z
> Compiled with protoc 2.5.0
> From source with checksum de51bf9693ab9426379a1cd28142cea0
> This command was run using 
> /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code}
>  
>  
>Reporter: Mateusz Jukiewicz
>Priority: Major
>  Labels: Correctness, CorrectnessBug, correctness
>
> This is what happens:
> {code:java}
> /* Exemplary spark-shell starting command 
> /opt/spark/bin/spark-shell \
> --num-executors 269 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.kryoserializer.buffer.max=512m 
> */
> val dataset = spark.read.textFile("/text_dataset.out")
> dataset.distinct.count
> // res0: Long = 24025868
> dataset.distinct.count
> // res1: Long = 24014227{code}
> The _text_dataset.out_ file is a dataset with one string per line. The string 
> has alphanumeric characters as well as colons and spaces. The line length 
> does not exceed 1200. I don't think that's important though, as the issue 
> appeared on various other datasets, I just tried to narrow it down to the 
> simplest possible case.
> The observations regarding the issue are as follows:
>  * I managed to reproduce it on both spark 2.2 and spark 2.1.
>  * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
>  * The issue is not reproducible on a single machine (e.g. laptop) in spark 
> local mode.
>  * It seems that once the correct count is computed, it is not possible to 
> reproduce the issue in the same spark session. In other words, I was able to 
> get 2-3 incorrect distinct.count results consecutively, but once it got 
> right, it always returned the correct value. I had to re-run spark-shell to 
> observe the problem again.
>  * The issue appears on both Dataset and DataFrame (i.e. using read.text or 
> read.textFile).
>  * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
>  * Not a single container has failed in those multiple invalid executions.
>  * YARN doesn't show any warnings or errors in those invalid executions.
>  * The execution plan determined for both valid and invalid executions was 
> always the same (it's shown in the _SQL_ tab of the UI).
>  * The number returned in the invalid executions was always greater than the 
> correct number (24 014 227).
>  * This occurs even though the input is already completely deduplicated (i.e. 
> _distinct.count_ shouldn't change anything).
>  * The input isn't replicated (i.e. there's only one copy of each file block 
> on the HDFS).
>  * The problem is probably not related to reading from HDFS. Spark was always 
> able to correctly read all input records (which was shown in the UI), and 
> that number got malformed after the exchange phase:
>  ** correct execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24014227 _(second stage)_
>  ** incorrect execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24020150 _(second stage)_
>  * The problem might be related with the internal way of Encoders hashing. 
> The reason might be:
>  ** in a simple `distinct.count` invocation, there are in total three 
> hash-related stages (called `HashAggregate`),
>  ** excerpt from scaladoc for `distinct` method says:
> {code:java}
>* @note Equality checking is performed directly on the encoded 
> representation of the data
>* and thus is not affected by a custom `equals` function defined on 
> `T`.

[jira] [Commented] (SPARK-23298) distinct.count on Dataset/DataFrame yields non-deterministic results

2018-08-10 Thread Thomas Graves (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576709#comment-16576709
 ] 

Thomas Graves commented on SPARK-23298:
---

[~mjukiewicz] have you tried spark with fix for SPARK-23207 and can you still 
reproduce?  I don't see your code doing any repartitions

> distinct.count on Dataset/DataFrame yields non-deterministic results
> 
>
> Key: SPARK-23298
> URL: https://issues.apache.org/jira/browse/SPARK-23298
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, SQL, YARN
>Affects Versions: 2.1.0, 2.2.0
> Environment: Spark 2.2.0 or 2.1.0
> Java 1.8.0_144
> Yarn version:
> {code:java}
> Hadoop 2.6.0-cdh5.12.1
> Subversion http://github.com/cloudera/hadoop -r 
> 520d8b072e666e9f21d645ca6a5219fc37535a52
> Compiled by jenkins on 2017-08-24T16:43Z
> Compiled with protoc 2.5.0
> From source with checksum de51bf9693ab9426379a1cd28142cea0
> This command was run using 
> /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code}
>  
>  
>Reporter: Mateusz Jukiewicz
>Priority: Major
>  Labels: Correctness, CorrectnessBug, correctness
>
> This is what happens:
> {code:java}
> /* Exemplary spark-shell starting command 
> /opt/spark/bin/spark-shell \
> --num-executors 269 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.kryoserializer.buffer.max=512m 
> */
> val dataset = spark.read.textFile("/text_dataset.out")
> dataset.distinct.count
> // res0: Long = 24025868
> dataset.distinct.count
> // res1: Long = 24014227{code}
> The _text_dataset.out_ file is a dataset with one string per line. The string 
> has alphanumeric characters as well as colons and spaces. The line length 
> does not exceed 1200. I don't think that's important though, as the issue 
> appeared on various other datasets, I just tried to narrow it down to the 
> simplest possible case.
> The observations regarding the issue are as follows:
>  * I managed to reproduce it on both spark 2.2 and spark 2.1.
>  * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
>  * The issue is not reproducible on a single machine (e.g. laptop) in spark 
> local mode.
>  * It seems that once the correct count is computed, it is not possible to 
> reproduce the issue in the same spark session. In other words, I was able to 
> get 2-3 incorrect distinct.count results consecutively, but once it got 
> right, it always returned the correct value. I had to re-run spark-shell to 
> observe the problem again.
>  * The issue appears on both Dataset and DataFrame (i.e. using read.text or 
> read.textFile).
>  * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
>  * Not a single container has failed in those multiple invalid executions.
>  * YARN doesn't show any warnings or errors in those invalid executions.
>  * The execution plan determined for both valid and invalid executions was 
> always the same (it's shown in the _SQL_ tab of the UI).
>  * The number returned in the invalid executions was always greater than the 
> correct number (24 014 227).
>  * This occurs even though the input is already completely deduplicated (i.e. 
> _distinct.count_ shouldn't change anything).
>  * The input isn't replicated (i.e. there's only one copy of each file block 
> on the HDFS).
>  * The problem is probably not related to reading from HDFS. Spark was always 
> able to correctly read all input records (which was shown in the UI), and 
> that number got malformed after the exchange phase:
>  ** correct execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24014227 _(second stage)_
>  ** incorrect execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24020150 _(second stage)_
>  * The problem might be related with the internal way of Encoders hashing. 
> The reason might be:
>  ** in a simple `distinct.count` invocation, there are in total three 
> hash-related stages (called `HashAggregate`),
>  ** excerpt from scaladoc for `distinct` method says:
> {code:java}
>* @note Equality checking is performed directly on the encoded 
> representation of the data
>* and thus is not affected by a custom `equals` function defined on 
> `T`.{code}
>  * One of my suspicions was the number of partitions we're using (2154). This 
> is greater than 2000, which means that a different data structure (i.e. 
> _HighlyCompressedMapStatus_instead of _CompressedMapStatus_) will be used for 
> book-keeping during the shuffle. Unfortunately after decreasing the number 
> below this threshold the problem still occurs.
>  * It's easie

[jira] [Commented] (SPARK-23298) distinct.count on Dataset/DataFrame yields non-deterministic results

2018-02-12 Thread Mateusz Jukiewicz (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360415#comment-16360415
 ] 

Mateusz Jukiewicz commented on SPARK-23298:
---

Not sure but seems like it could be related to SPARK-23207

> distinct.count on Dataset/DataFrame yields non-deterministic results
> 
>
> Key: SPARK-23298
> URL: https://issues.apache.org/jira/browse/SPARK-23298
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, SQL, YARN
>Affects Versions: 2.1.0, 2.2.0
> Environment: Spark 2.2.0 or 2.1.0
> Java 1.8.0_144
> Yarn version:
> {code:java}
> Hadoop 2.6.0-cdh5.12.1
> Subversion http://github.com/cloudera/hadoop -r 
> 520d8b072e666e9f21d645ca6a5219fc37535a52
> Compiled by jenkins on 2017-08-24T16:43Z
> Compiled with protoc 2.5.0
> From source with checksum de51bf9693ab9426379a1cd28142cea0
> This command was run using 
> /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code}
>  
>  
>Reporter: Mateusz Jukiewicz
>Priority: Major
>
> This is what happens:
> {code:java}
> /* Exemplary spark-shell starting command 
> /opt/spark/bin/spark-shell \
> --num-executors 269 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.kryoserializer.buffer.max=512m 
> */
> val dataset = spark.read.textFile("/text_dataset.out")
> dataset.distinct.count
> // res0: Long = 24025868
> dataset.distinct.count
> // res1: Long = 24014227{code}
> The _text_dataset.out_ file is a dataset with one string per line. The string 
> has alphanumeric characters as well as colons and spaces. The line length 
> does not exceed 1200. I don't think that's important though, as the issue 
> appeared on various other datasets, I just tried to narrow it down to the 
> simplest possible case.
> The observations regarding the issue are as follows:
>  * I managed to reproduce it on both spark 2.2 and spark 2.1.
>  * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
>  * The issue is not reproducible on a single machine (e.g. laptop) in spark 
> local mode.
>  * It seems that once the correct count is computed, it is not possible to 
> reproduce the issue in the same spark session. In other words, I was able to 
> get 2-3 incorrect distinct.count results consecutively, but once it got 
> right, it always returned the correct value. I had to re-run spark-shell to 
> observe the problem again.
>  * The issue appears on both Dataset and DataFrame (i.e. using read.text or 
> read.textFile).
>  * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
>  * Not a single container has failed in those multiple invalid executions.
>  * YARN doesn't show any warnings or errors in those invalid executions.
>  * The execution plan determined for both valid and invalid executions was 
> always the same (it's shown in the _SQL_ tab of the UI).
>  * The number returned in the invalid executions was always greater than the 
> correct number (24 014 227).
>  * This occurs even though the input is already completely deduplicated (i.e. 
> _distinct.count_ shouldn't change anything).
>  * The input isn't replicated (i.e. there's only one copy of each file block 
> on the HDFS).
>  * The problem is probably not related to reading from HDFS. Spark was always 
> able to correctly read all input records (which was shown in the UI), and 
> that number got malformed after the exchange phase:
>  ** correct execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24014227 _(second stage)_
>  ** incorrect execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24020150 _(second stage)_
>  * The problem might be related with the internal way of Encoders hashing. 
> The reason might be:
>  ** in a simple `distinct.count` invocation, there are in total three 
> hash-related stages (called `HashAggregate`),
>  ** excerpt from scaladoc for `distinct` method says:
> {code:java}
>* @note Equality checking is performed directly on the encoded 
> representation of the data
>* and thus is not affected by a custom `equals` function defined on 
> `T`.{code}
>  * One of my suspicions was the number of partitions we're using (2154). This 
> is greater than 2000, which means that a different data structure (i.e. 
> _HighlyCompressedMapStatus_instead of _CompressedMapStatus_) will be used for 
> book-keeping during the shuffle. Unfortunately after decreasing the number 
> below this threshold the problem still occurs.
>  * It's easier to reproduce the issue with a large number of partitions.
>  * One of my another suspicions was that it's somehow related to the num