DataFrame/Dataset join not producing correct results in Spark 2.0/Yarn

2016-10-12 Thread Stephen Hankinson
Hi,

We have a cluster running Apache Spark 2.0 on Hadoop 2.7.2, Centos 7.2. We
had written some new code using the Spark DataFrame/DataSet APIs but are
noticing incorrect results on a join after writing and then reading data to
Windows Azure Storage Blobs (The default HDFS location). I've been able to
duplicate the issue with the following snippet of code running on the
cluster.

case class UserDimensions(user: Long, dimension: Long, score: Double)
case class CentroidClusterScore(dimension: Long, cluster: Int, score: Double)

val dims = sc.parallelize(Array(UserDimensions(12345, 0, 1.0))).toDS
val cent = sc.parallelize(Array(CentroidClusterScore(0, 1,
1.0),CentroidClusterScore(1, 0, 1.0),CentroidClusterScore(2, 2,
1.0))).toDS

dims.show
cent.show
dims.join(cent, dims("dimension") === cent("dimension") ).show

outputs

+-+-+-+
| user|dimension|score|
+-+-+-+
|12345|0|  1.0|
+-+-+-+

+-+---+-+
|dimension|cluster|score|
+-+---+-+
|0|  1|  1.0|
|1|  0|  1.0|
|2|  2|  1.0|
+-+---+-+

+-+-+-+-+---+-+
| user|dimension|score|dimension|cluster|score|
+-+-+-+-+---+-+
|12345|0|  1.0|0|  1|  1.0|
+-+-+-+-+---+-+

which is correct. However after writing and reading the data, we see this

dims.write.mode("overwrite").save("/tmp/dims2.parquet")
cent.write.mode("overwrite").save("/tmp/cent2.parquet")

val dims2 = spark.read.load("/tmp/dims2.parquet").as[UserDimensions]
val cent2 = spark.read.load("/tmp/cent2.parquet").as[CentroidClusterScore]

dims2.show
cent2.show

dims2.join(cent2, dims2("dimension") === cent2("dimension") ).show

outputs

+-+-+-+
| user|dimension|score|
+-+-+-+
|12345|0|  1.0|
+-+-+-+

+-+---+-+
|dimension|cluster|score|
+-+---+-+
|0|  1|  1.0|
|1|  0|  1.0|
|2|  2|  1.0|
+-+---+-+

+-+-+-+-+---+-+
| user|dimension|score|dimension|cluster|score|
+-+-+-+-+---+-+
|12345|0|  1.0| null|   null| null|
+-+-+-+-+---+-+

However, using the RDD API produces the correct result

dims2.rdd.map( row => (row.dimension, row) ).join( cent2.rdd.map( row
=> (row.dimension, row) ) ).take(5)

res5: Array[(Long, (UserDimensions, CentroidClusterScore))] =
Array((0,(UserDimensions(12345,0,1.0),CentroidClusterScore(0,1,1.0

We've tried changing the output format to ORC instead of parquet, but we
see the same results. Running Spark 2.0 locally, not on a cluster, does not
have this issue. Also running spark in local mode on the master node of the
Hadoop cluster also works. Only when running on top of YARN do we see this
issue.

This also seems very similar to this issue: https://issues.apache.
org/jira/browse/SPARK-10896
Thoughts?


*Stephen Hankinson*


UnknownHostException with Mesos and custom Jar

2015-09-28 Thread Stephen Hankinson
Hi,

Wondering if anyone can help me with the issue I am having.

I am receiving an UnknownHostException when running a custom jar with Spark
on Mesos. The issue does not happen when running spark-shell.

My spark-env.sh contains the following:

export MESOS_NATIVE_JAVA_LIBRARY=/usr/local/lib/libmesos.so

export HADOOP_CONF_DIR=/hadoop-2.7.1/etc/hadoop/

My spark-defaults.conf contains the following:

spark.master   mesos://zk://172.31.0.81:2181,
172.31.16.81:2181,172.31.32.81:2181/mesos

spark.mesos.executor.home  /spark-1.5.0-bin-hadoop2.6/

Starting spark-shell as follows and running the following line works
correctly:

/spark-1.5.0-bin-hadoop2.6/bin/spark-shell

sc.textFile("/tmp/Input").collect.foreach(println)

15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(88528) called
with curMem=0, maxMem=556038881

15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_0 stored as
values in memory (estimated size 86.5 KB, free 530.2 MB)

15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(20236) called
with curMem=88528, maxMem=556038881

15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_0_piece0 stored
as bytes in memory (estimated size 19.8 KB, free 530.2 MB)

15/09/28 20:04:49 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
in memory on 172.31.21.104:49048 (size: 19.8 KB, free: 530.3 MB)

15/09/28 20:04:49 INFO spark.SparkContext: Created broadcast 0 from
textFile at :22

15/09/28 20:04:49 INFO mapred.FileInputFormat: Total input paths to process
: 1

15/09/28 20:04:49 INFO spark.SparkContext: Starting job: collect at
:22

15/09/28 20:04:49 INFO scheduler.DAGScheduler: Got job 0 (collect at
:22) with 3 output partitions

15/09/28 20:04:49 INFO scheduler.DAGScheduler: Final stage: ResultStage
0(collect at :22)

15/09/28 20:04:49 INFO scheduler.DAGScheduler: Parents of final stage:
List()

15/09/28 20:04:49 INFO scheduler.DAGScheduler: Missing parents: List()

15/09/28 20:04:49 INFO scheduler.DAGScheduler: Submitting ResultStage 0
(MapPartitionsRDD[1] at textFile at :22), which has no missing
parents

15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(3120) called
with curMem=108764, maxMem=556038881

15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_1 stored as
values in memory (estimated size 3.0 KB, free 530.2 MB)

15/09/28 20:04:49 INFO storage.MemoryStore: ensureFreeSpace(1784) called
with curMem=111884, maxMem=556038881

15/09/28 20:04:49 INFO storage.MemoryStore: Block broadcast_1_piece0 stored
as bytes in memory (estimated size 1784.0 B, free 530.2 MB)

15/09/28 20:04:49 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
in memory on 172.31.21.104:49048 (size: 1784.0 B, free: 530.3 MB)

15/09/28 20:04:49 INFO spark.SparkContext: Created broadcast 1 from
broadcast at DAGScheduler.scala:861

15/09/28 20:04:49 INFO scheduler.DAGScheduler: Submitting 3 missing tasks
from ResultStage 0 (MapPartitionsRDD[1] at textFile at :22)

15/09/28 20:04:49 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0
with 3 tasks

15/09/28 20:04:49 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
0.0 (TID 0, ip-172-31-37-82.us-west-2.compute.internal, NODE_LOCAL, 2142
bytes)

15/09/28 20:04:49 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
0.0 (TID 1, ip-172-31-21-104.us-west-2.compute.internal, NODE_LOCAL, 2142
bytes)

15/09/28 20:04:49 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
0.0 (TID 2, ip-172-31-4-4.us-west-2.compute.internal, NODE_LOCAL, 2142
bytes)

15/09/28 20:04:52 INFO storage.BlockManagerMasterEndpoint: Registering
block manager ip-172-31-4-4.us-west-2.compute.internal:50648 with 530.3 MB
RAM, BlockManagerId(20150928-190245-1358962604-5050-11297-S2,
ip-172-31-4-4.us-west-2.compute.internal, 50648)

15/09/28 20:04:52 INFO storage.BlockManagerMasterEndpoint: Registering
block manager ip-172-31-37-82.us-west-2.compute.internal:52624 with 530.3
MB RAM, BlockManagerId(20150928-190245-1358962604-5050-11297-S1,
ip-172-31-37-82.us-west-2.compute.internal, 52624)

15/09/28 20:04:52 INFO storage.BlockManagerMasterEndpoint: Registering
block manager ip-172-31-21-104.us-west-2.compute.internal:56628 with 530.3
MB RAM, BlockManagerId(20150928-190245-1358962604-5050-11297-S0,
ip-172-31-21-104.us-west-2.compute.internal, 56628)

15/09/28 20:04:52 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
in memory on ip-172-31-37-82.us-west-2.compute.internal:52624 (size: 1784.0
B, free: 530.3 MB)

15/09/28 20:04:52 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
in memory on ip-172-31-21-104.us-west-2.compute.internal:56628 (size:
1784.0 B, free: 530.3 MB)

15/09/28 20:04:52 INFO storage.BlockManagerInfo: Added broadcast_1_piece0
in memory on ip-172-31-4-4.us-west-2.compute.internal:50648 (size: 1784.0
B, free: 530.3 MB)

15/09/28 20:04:52 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
in memory on ip-172-31-37-82.us-west-2.compute.internal:52624 (size: 19.8
KB, free: 530.3 MB)