[jira] [Commented] (SPARK-16550) Caching data with replication doesn't replicate data

2016-07-25 Thread Shubham Chopra (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15392680#comment-15392680
 ] 

Shubham Chopra commented on SPARK-16550:


[~rxin] I did some basic testing using the patch, and it shows the desired 
replication. Thanks [~ekhliang]!

The patch, however, does fail some unit tests in DistributedSuite.

> Caching data with replication doesn't replicate data
> 
>
> Key: SPARK-16550
> URL: https://issues.apache.org/jira/browse/SPARK-16550
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.0
>Reporter: Shubham Chopra
>Assignee: Josh Rosen
>
> Caching multiple replicas of blocks is currently broken. The following 
> examples show replication doesn't happen for various use-cases:
> These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode
> {noformat}
> case class TestInteger(i: Int)
> val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_2)
> data.count
> {noformat}
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows 
> only 10 blocks as opposed to the expected 20
> Block replication fails on the executors with a java.lang.RuntimeException: 
> java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger
> {noformat}
> val data1 = sc.parallelize(1 to 1000, 
> 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
> data1.count
> Block replication again fails with the following errors:
> 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking 
> RpcHandler#receive() on RPC id 8567643992794608648
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 13994
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
> {noformat}
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum 
> again shows 10 blocks
> Caching serialized data works for native types, but not for custom classes
> {noformat}
> val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
> data3.count
> {noformat}
> works as intended.
> But 
> {noformat}
> val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_SER_2)
> data4.count
> {noformat}
> Again doesn't replicate data and executors show the same 
> ClassNotFoundException
> These examples worked fine and showed expected results with Spark 1.6.2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16550) Caching data with replication doesn't replicate data

2016-07-21 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388785#comment-15388785
 ] 

Reynold Xin commented on SPARK-16550:
-

[~shubhamc] can you test the patch [~ekhliang] submitted?

> Caching data with replication doesn't replicate data
> 
>
> Key: SPARK-16550
> URL: https://issues.apache.org/jira/browse/SPARK-16550
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.0
>Reporter: Shubham Chopra
>Assignee: Josh Rosen
>
> Caching multiple replicas of blocks is currently broken. The following 
> examples show replication doesn't happen for various use-cases:
> These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode
> {noformat}
> case class TestInteger(i: Int)
> val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_2)
> data.count
> {noformat}
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows 
> only 10 blocks as opposed to the expected 20
> Block replication fails on the executors with a java.lang.RuntimeException: 
> java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger
> {noformat}
> val data1 = sc.parallelize(1 to 1000, 
> 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
> data1.count
> Block replication again fails with the following errors:
> 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking 
> RpcHandler#receive() on RPC id 8567643992794608648
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 13994
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
> {noformat}
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum 
> again shows 10 blocks
> Caching serialized data works for native types, but not for custom classes
> {noformat}
> val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
> data3.count
> {noformat}
> works as intended.
> But 
> {noformat}
> val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_SER_2)
> data4.count
> {noformat}
> Again doesn't replicate data and executors show the same 
> ClassNotFoundException
> These examples worked fine and showed expected results with Spark 1.6.2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16550) Caching data with replication doesn't replicate data

2016-07-21 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15388638#comment-15388638
 ] 

Apache Spark commented on SPARK-16550:
--

User 'ericl' has created a pull request for this issue:
https://github.com/apache/spark/pull/14311

> Caching data with replication doesn't replicate data
> 
>
> Key: SPARK-16550
> URL: https://issues.apache.org/jira/browse/SPARK-16550
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.0
>Reporter: Shubham Chopra
>Assignee: Josh Rosen
>
> Caching multiple replicas of blocks is currently broken. The following 
> examples show replication doesn't happen for various use-cases:
> These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode
> {noformat}
> case class TestInteger(i: Int)
> val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_2)
> data.count
> {noformat}
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows 
> only 10 blocks as opposed to the expected 20
> Block replication fails on the executors with a java.lang.RuntimeException: 
> java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger
> {noformat}
> val data1 = sc.parallelize(1 to 1000, 
> 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
> data1.count
> Block replication again fails with the following errors:
> 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking 
> RpcHandler#receive() on RPC id 8567643992794608648
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 13994
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
> {noformat}
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum 
> again shows 10 blocks
> Caching serialized data works for native types, but not for custom classes
> {noformat}
> val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
> data3.count
> {noformat}
> works as intended.
> But 
> {noformat}
> val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_SER_2)
> data4.count
> {noformat}
> Again doesn't replicate data and executors show the same 
> ClassNotFoundException
> These examples worked fine and showed expected results with Spark 1.6.2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16550) Caching data with replication doesn't replicate data

2016-07-15 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15379793#comment-15379793
 ] 

Josh Rosen commented on SPARK-16550:


I have a partial fix for the "unable to replicate integers" issue, including an 
enhancement to the caching tests in DistributedSuite to make them capable of 
catching this bug: https://github.com/JoshRosen/spark/tree/SPARK-16550

All that remains is to fix the ClassTag + Classloading issue so that this also 
works for REPL-defined classes (or, alternatively, just erase the caching 
ClassTag to ClassTag\[Any] since we're just going to use the default 
serializer).

> Caching data with replication doesn't replicate data
> 
>
> Key: SPARK-16550
> URL: https://issues.apache.org/jira/browse/SPARK-16550
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.0
>Reporter: Shubham Chopra
>Assignee: Josh Rosen
>
> Caching multiple replicas of blocks is currently broken. The following 
> examples show replication doesn't happen for various use-cases:
> These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode
> {noformat}
> case class TestInteger(i: Int)
> val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_2)
> data.count
> {noformat}
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows 
> only 10 blocks as opposed to the expected 20
> Block replication fails on the executors with a java.lang.RuntimeException: 
> java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger
> {noformat}
> val data1 = sc.parallelize(1 to 1000, 
> 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
> data1.count
> Block replication again fails with the following errors:
> 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking 
> RpcHandler#receive() on RPC id 8567643992794608648
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 13994
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
> {noformat}
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum 
> again shows 10 blocks
> Caching serialized data works for native types, but not for custom classes
> {noformat}
> val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
> data3.count
> {noformat}
> works as intended.
> But 
> {noformat}
> val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_SER_2)
> data4.count
> {noformat}
> Again doesn't replicate data and executors show the same 
> ClassNotFoundException
> These examples worked fine and showed expected results with Spark 1.6.2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16550) Caching data with replication doesn't replicate data

2016-07-14 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378593#comment-15378593
 ] 

Josh Rosen commented on SPARK-16550:


*For the case involving a REPL-defined class:* The ClassNotFoundException is 
occurring because the NettyBlockRpcServer is attempting to deserialize a 
ClassTag field and that ClassTag references a class which is only present in 
the REPL classloader and not in the executor JVM's default classloader. One 
potential solution is to use the executor's REPL / context classloader for 
deserializing ClassTags, but this might be slightly non-straightforward due to 
some complicated initialization ordering dependencies.

*For the case involving integers:* I omitted a classTag inside a call in 
{{doGetLocalBytes()}}, which is called as part of the write-side of the block 
replication code.

> Caching data with replication doesn't replicate data
> 
>
> Key: SPARK-16550
> URL: https://issues.apache.org/jira/browse/SPARK-16550
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.0
>Reporter: Shubham Chopra
>
> Caching multiple replicas of blocks is currently broken. The following 
> examples show replication doesn't happen for various use-cases:
> These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode
> {noformat}
> case class TestInteger(i: Int)
> val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_2)
> data.count
> {noformat}
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows 
> only 10 blocks as opposed to the expected 20
> Block replication fails on the executors with a java.lang.RuntimeException: 
> java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger
> {noformat}
> val data1 = sc.parallelize(1 to 1000, 
> 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
> data1.count
> Block replication again fails with the following errors:
> 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking 
> RpcHandler#receive() on RPC id 8567643992794608648
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 13994
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
> {noformat}
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum 
> again shows 10 blocks
> Caching serialized data works for native types, but not for custom classes
> {noformat}
> val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
> data3.count
> {noformat}
> works as intended.
> But 
> {noformat}
> val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_SER_2)
> data4.count
> {noformat}
> Again doesn't replicate data and executors show the same 
> ClassNotFoundException
> These examples worked fine and showed expected results with Spark 1.6.2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16550) Caching data with replication doesn't replicate data

2016-07-14 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378527#comment-15378527
 ] 

Reynold Xin commented on SPARK-16550:
-

cc [~joshrosen] who wrote SPARK-13990.


> Caching data with replication doesn't replicate data
> 
>
> Key: SPARK-16550
> URL: https://issues.apache.org/jira/browse/SPARK-16550
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.0
>Reporter: Shubham Chopra
>
> Caching multiple replicas of blocks is currently broken. The following 
> examples show replication doesn't happen for various use-cases:
> These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode
> {noformat}
> case class TestInteger(i: Int)
> val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_2)
> data.count
> {noformat}
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows 
> only 10 blocks as opposed to the expected 20
> Block replication fails on the executors with a java.lang.RuntimeException: 
> java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger
> {noformat}
> val data1 = sc.parallelize(1 to 1000, 
> 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
> data1.count
> Block replication again fails with the following errors:
> 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking 
> RpcHandler#receive() on RPC id 8567643992794608648
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 13994
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
> {noformat}
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum 
> again shows 10 blocks
> Caching serialized data works for native types, but not for custom classes
> {noformat}
> val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
> data3.count
> {noformat}
> works as intended.
> But 
> {noformat}
> val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_SER_2)
> data4.count
> {noformat}
> Again doesn't replicate data and executors show the same 
> ClassNotFoundException
> These examples worked fine and showed expected results with Spark 1.6.2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16550) Caching data with replication doesn't replicate data

2016-07-14 Thread Shubham Chopra (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378478#comment-15378478
 ] 

Shubham Chopra commented on SPARK-16550:


Example code: 
```
case class TestInteger(i: Int)

object TestApp {
  def main(args: Array[String]) {
val conf = (new SparkConf).setAppName("Test app").setMaster("yarn-client")
val sc = new SparkContext(conf)

val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
println(data.count)
println(s"Total number of blocks in data: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(data.id).size).sum}")

val dataTestInt = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataTestInt.count)
println(s"Total number of blocks in dataTestInt: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestInt.id).size).sum}")

val dataInteger = sc.parallelize((1 to 1000).map(new Integer(_)), 
10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataInteger.count)
println(s"Total number of blocks in dataInteger: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataInteger.id).size).sum}")

val dataSerialized = sc.parallelize(1 to 1000, 
10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataSerialized.count)
println(s"Total number of blocks in dataSerialized: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataSerialized.id).size).sum}")

val dataTestIntSer = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataTestIntSer.count)
println(s"Total number of blocks in dataTestIntSer: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestIntSer.id).size).sum}")

  }
}
```

Output:
1000
Total number of blocks in data: 10
1000
Total number of blocks in dataTestInt: 10
1000
Total number of blocks in dataInteger: 20
1000
Total number of blocks in dataSerialized: 20
1000
Total number of blocks in dataTestIntSer: 10

The issue exists when I submit a compiled program as well. The exception stack 
traces are similar to the ones posted above.

I think part of the problem might be related to 
https://issues.apache.org/jira/browse/SPARK-13990

This code works fine in Spark 1.6.2, both in the shell and when submitted as 
compiled code.

> Caching data with replication doesn't replicate data
> 
>
> Key: SPARK-16550
> URL: https://issues.apache.org/jira/browse/SPARK-16550
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.0
>Reporter: Shubham Chopra
>
> Caching multiple replicas of blocks is currently broken. The following 
> examples show replication doesn't happen for various use-cases:
> These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode
> case class TestInteger(i: Int)
> val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_2)
> data.count
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows 
> only 10 blocks as opposed to the expected 20
> Block replication fails on the executors with a java.lang.RuntimeException: 
> java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger
> val data1 = sc.parallelize(1 to 1000, 
> 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
> data1.count
> Block replication again fails with the following errors:
> 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking 
> RpcHandler#receive() on RPC id 8567643992794608648
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 13994
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum 
> again shows 10 blocks
> Caching serialized data works for native types, but not for custom classes
> val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
> data3.count
> works as intended.
> But 
> val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 

[jira] [Commented] (SPARK-16550) Caching data with replication doesn't replicate data

2016-07-14 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378167#comment-15378167
 ] 

Sean Owen commented on SPARK-16550:
---

You have a shell-related problem here that caused the failure. It has nothing 
to do with replication. Basically, you'll find that case classes in the shell 
don't necessarily work in all cases. These are more functions of how Scala 
works and not Spark issues per se. Compiled programs will work fine. I'll close 
this.

> Caching data with replication doesn't replicate data
> 
>
> Key: SPARK-16550
> URL: https://issues.apache.org/jira/browse/SPARK-16550
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.0
>Reporter: Shubham Chopra
>Priority: Blocker
>
> Caching multiple replicas of blocks is currently broken. The following 
> examples show replication doesn't happen for various use-cases:
> These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode
> case class TestInteger(i: Int)
> val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_2)
> data.count
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows 
> only 10 blocks as opposed to the expected 20
> Block replication fails on the executors with a java.lang.RuntimeException: 
> java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger
> val data1 = sc.parallelize(1 to 1000, 
> 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
> data1.count
> Block replication again fails with the following errors:
> 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking 
> RpcHandler#receive() on RPC id 8567643992794608648
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 13994
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum 
> again shows 10 blocks
> Caching serialized data works for native types, but not for custom classes
> val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
> data3.count
> works as intended.
> But 
> val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_SER_2)
> data4.count
> Again doesn't replicate data and executors show the same 
> ClassNotFoundException
> These examples worked fine and showed expected results with Spark 1.6.2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org