[ https://issues.apache.org/jira/browse/SPARK-17962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-17962. ------------------------------- Resolution: Duplicate > DataFrame/Dataset join not producing correct results in Spark 2.0/Yarn > ---------------------------------------------------------------------- > > Key: SPARK-17962 > URL: https://issues.apache.org/jira/browse/SPARK-17962 > Project: Spark > Issue Type: Bug > Components: Spark Core, YARN > Affects Versions: 2.0.0 > Environment: Centos 7.2, Hadoop 2.7.2, Spark 2.0.0 > Reporter: Stephen Hankinson > > Environment can be reproduced via this git repo using the Deploy to Azure > button: https://github.com/shankinson/spark (The cluster name must be the > same as the resource group name used for this to launch properly, login with > username hadoop, and launch the shell with > /home/hadoop/spark-2.0.0-bin-hadoop2.7/bin/spark-shell --master yarn > --deploy-mode client --conf > spark.serializer=org.apache.spark.serializer.KryoSerializer --conf > spark.sql.catalogImplementation=in-memory --driver-memory 10g --driver-cores > 4) > We have a cluster running Apache Spark 2.0 on Hadoop 2.7.2, Centos 7.2. We > had written some new code using the Spark DataFrame/DataSet APIs but are > noticing incorrect results on a join after writing and then reading data to > Windows Azure Storage Blobs (The default HDFS location). I've been able to > duplicate the issue with the following snippet of code running on the cluster. > case class UserDimensions(user: Long, dimension: Long, score: Double) > case class CentroidClusterScore(dimension: Long, cluster: Int, score: Double) > val dims = sc.parallelize(Array(UserDimensions(12345, 0, 1.0))).toDS > val cent = sc.parallelize(Array(CentroidClusterScore(0, 1, > 1.0),CentroidClusterScore(1, 0, 1.0),CentroidClusterScore(2, 2, 1.0))).toDS > dims.show > cent.show > dims.join(cent, dims("dimension") === cent("dimension") ).show > outputs > || user||dimension||score|| > |12345| 0| 1.0| > ||dimension||cluster||score|| > | 0| 1| 1.0| > | 1| 0| 1.0| > | 2| 2| 1.0| > || user||dimension||score||dimension||cluster||score|| > |12345| 0| 1.0| 0| 1| 1.0| > which is correct. However after writing and reading the data, we see this > dims.write.mode("overwrite").save("/tmp/dims2.parquet") > cent.write.mode("overwrite").save("/tmp/cent2.parquet") > val dims2 = spark.read.load("/tmp/dims2.parquet").as[UserDimensions] > val cent2 = spark.read.load("/tmp/cent2.parquet").as[CentroidClusterScore] > dims2.show > cent2.show > dims2.join(cent2, dims2("dimension") === cent2("dimension") ).show > outputs > || user||dimension||score|| > |12345| 0| 1.0| > ||dimension||cluster||score|| > | 0| 1| 1.0| > | 1| 0| 1.0| > | 2| 2| 1.0| > || user||dimension||score||dimension||cluster||score|| > |12345| 0| 1.0| null| null| null| > However, using the RDD API produces the correct result > dims2.rdd.map( row => (row.dimension, row) ).join( cent2.rdd.map( row => > (row.dimension, row) ) ).take(5) > res5: Array[(Long, (UserDimensions, CentroidClusterScore))] = > Array((0,(UserDimensions(12345,0,1.0),CentroidClusterScore(0,1,1.0)))) > We've tried changing the output format to ORC instead of parquet, but we see > the same results. Running Spark 2.0 locally, not on a cluster, does not have > this issue. Also running spark in local mode on the master node of the Hadoop > cluster also works. Only when running on top of YARN do we see this issue. > This also seems very similar to this issue: > https://issues.apache.org/jira/browse/SPARK-10896 > We have also determined this appears to be related to the memory settings of > the cluster. The worker machines have 56000MB available, the node manager > memory is set to 54784M and executor memory set to 48407M when we see this > issue happen. Lowering the executor memory to something like 28407M removes > the issue from happening. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org