[ 
https://issues.apache.org/jira/browse/SPARK-17962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-17962.
-------------------------------
    Resolution: Duplicate

> DataFrame/Dataset join not producing correct results in Spark 2.0/Yarn
> ----------------------------------------------------------------------
>
>                 Key: SPARK-17962
>                 URL: https://issues.apache.org/jira/browse/SPARK-17962
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, YARN
>    Affects Versions: 2.0.0
>         Environment: Centos 7.2, Hadoop 2.7.2, Spark 2.0.0
>            Reporter: Stephen Hankinson
>
> Environment can be reproduced via this git repo using the Deploy to Azure 
> button: https://github.com/shankinson/spark (The cluster name must be the 
> same as the resource group name used for this to launch properly, login with 
> username hadoop, and launch the shell with 
> /home/hadoop/spark-2.0.0-bin-hadoop2.7/bin/spark-shell --master yarn 
> --deploy-mode client --conf 
> spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
> spark.sql.catalogImplementation=in-memory --driver-memory 10g --driver-cores 
> 4)
> We have a cluster running Apache Spark 2.0 on Hadoop 2.7.2, Centos 7.2. We 
> had written some new code using the Spark DataFrame/DataSet APIs but are 
> noticing incorrect results on a join after writing and then reading data to 
> Windows Azure Storage Blobs (The default HDFS location). I've been able to 
> duplicate the issue with the following snippet of code running on the cluster.
> case class UserDimensions(user: Long, dimension: Long, score: Double)
> case class CentroidClusterScore(dimension: Long, cluster: Int, score: Double)
> val dims = sc.parallelize(Array(UserDimensions(12345, 0, 1.0))).toDS
> val cent = sc.parallelize(Array(CentroidClusterScore(0, 1, 
> 1.0),CentroidClusterScore(1, 0, 1.0),CentroidClusterScore(2, 2, 1.0))).toDS
> dims.show
> cent.show
> dims.join(cent, dims("dimension") === cent("dimension") ).show
> outputs
> || user||dimension||score||
> |12345|        0|  1.0|
> ||dimension||cluster||score||
> |        0|      1|  1.0|
> |        1|      0|  1.0|
> |        2|      2|  1.0|
> || user||dimension||score||dimension||cluster||score||
> |12345|        0|  1.0|        0|      1|  1.0|
> which is correct. However after writing and reading the data, we see this
> dims.write.mode("overwrite").save("/tmp/dims2.parquet")
> cent.write.mode("overwrite").save("/tmp/cent2.parquet")
> val dims2 = spark.read.load("/tmp/dims2.parquet").as[UserDimensions]
> val cent2 = spark.read.load("/tmp/cent2.parquet").as[CentroidClusterScore]
> dims2.show
> cent2.show
> dims2.join(cent2, dims2("dimension") === cent2("dimension") ).show
> outputs
> || user||dimension||score||
> |12345|        0|  1.0|
> ||dimension||cluster||score||
> |        0|      1|  1.0|
> |        1|      0|  1.0|
> |        2|      2|  1.0|
> || user||dimension||score||dimension||cluster||score||
> |12345|        0|  1.0|     null|   null| null|
> However, using the RDD API produces the correct result
> dims2.rdd.map( row => (row.dimension, row) ).join( cent2.rdd.map( row => 
> (row.dimension, row) ) ).take(5)
> res5: Array[(Long, (UserDimensions, CentroidClusterScore))] = 
> Array((0,(UserDimensions(12345,0,1.0),CentroidClusterScore(0,1,1.0))))
> We've tried changing the output format to ORC instead of parquet, but we see 
> the same results. Running Spark 2.0 locally, not on a cluster, does not have 
> this issue. Also running spark in local mode on the master node of the Hadoop 
> cluster also works. Only when running on top of YARN do we see this issue.
> This also seems very similar to this issue: 
> https://issues.apache.org/jira/browse/SPARK-10896
> We have also determined this appears to be related to the memory settings of 
> the cluster. The worker machines have 56000MB available, the node manager 
> memory is set to 54784M and executor memory set to 48407M when we see this 
> issue happen. Lowering the executor memory to something like 28407M removes 
> the issue from happening.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to