[jira] [Updated] (SPARK-21344) BinaryType comparison does signed byte array comparison
[ https://issues.apache.org/jira/browse/SPARK-21344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shubham Chopra updated SPARK-21344: --- Priority: Blocker (was: Major) > BinaryType comparison does signed byte array comparison > --- > > Key: SPARK-21344 > URL: https://issues.apache.org/jira/browse/SPARK-21344 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1 >Reporter: Shubham Chopra >Priority: Blocker > > BinaryType used by Spark SQL defines ordering using signed byte comparisons. > This can lead to unexpected behavior. Consider the following code snippet > that shows this error: > {code} > case class TestRecord(col0: Array[Byte]) > def convertToBytes(i: Long): Array[Byte] = { > val bb = java.nio.ByteBuffer.allocate(8) > bb.putLong(i) > bb.array > } > def test = { > val sql = spark.sqlContext > import sql.implicits._ > val timestamp = 1498772083037L > val data = (timestamp to timestamp + 1000L).map(i => > TestRecord(convertToBytes(i))) > val testDF = sc.parallelize(data).toDF > val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp) && > col("col0") < convertToBytes(timestamp + 50L)) > val filter2 = testDF.filter(col("col0") >= convertToBytes(timestamp + > 50L) && col("col0") < convertToBytes(timestamp + 100L)) > val filter3 = testDF.filter(col("col0") >= convertToBytes(timestamp) && > col("col0") < convertToBytes(timestamp + 100L)) > assert(filter1.count == 50) > assert(filter2.count == 50) > assert(filter3.count == 100) > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21344) BinaryType comparison does signed byte array comparison
Shubham Chopra created SPARK-21344: -- Summary: BinaryType comparison does signed byte array comparison Key: SPARK-21344 URL: https://issues.apache.org/jira/browse/SPARK-21344 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.1 Reporter: Shubham Chopra BinaryType used by Spark SQL defines ordering using signed byte comparisons. This can lead to unexpected behavior. Consider the following code snippet that shows this error: {code:scala} case class TestRecord(col0: Array[Byte]) def convertToBytes(i: Long): Array[Byte] = { val bb = java.nio.ByteBuffer.allocate(8) bb.putLong(i) bb.array } def test = { val sql = spark.sqlContext import sql.implicits._ val timestamp = 1498772083037L val data = (timestamp to timestamp + 1000L).map(i => TestRecord(convertToBytes(i))) val testDF = sc.parallelize(data).toDF val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp) && col("col0") < convertToBytes(timestamp + 50L)) val filter2 = testDF.filter(col("col0") >= convertToBytes(timestamp + 50L) && col("col0") < convertToBytes(timestamp + 100L)) val filter3 = testDF.filter(col("col0") >= convertToBytes(timestamp) && col("col0") < convertToBytes(timestamp + 100L)) assert(filter1.count == 50) assert(filter2.count == 50) assert(filter3.count == 100) } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21344) BinaryType comparison does signed byte array comparison
[ https://issues.apache.org/jira/browse/SPARK-21344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shubham Chopra updated SPARK-21344: --- Description: BinaryType used by Spark SQL defines ordering using signed byte comparisons. This can lead to unexpected behavior. Consider the following code snippet that shows this error: {code} case class TestRecord(col0: Array[Byte]) def convertToBytes(i: Long): Array[Byte] = { val bb = java.nio.ByteBuffer.allocate(8) bb.putLong(i) bb.array } def test = { val sql = spark.sqlContext import sql.implicits._ val timestamp = 1498772083037L val data = (timestamp to timestamp + 1000L).map(i => TestRecord(convertToBytes(i))) val testDF = sc.parallelize(data).toDF val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp) && col("col0") < convertToBytes(timestamp + 50L)) val filter2 = testDF.filter(col("col0") >= convertToBytes(timestamp + 50L) && col("col0") < convertToBytes(timestamp + 100L)) val filter3 = testDF.filter(col("col0") >= convertToBytes(timestamp) && col("col0") < convertToBytes(timestamp + 100L)) assert(filter1.count == 50) assert(filter2.count == 50) assert(filter3.count == 100) } {code} was: BinaryType used by Spark SQL defines ordering using signed byte comparisons. This can lead to unexpected behavior. Consider the following code snippet that shows this error: {code:scala} case class TestRecord(col0: Array[Byte]) def convertToBytes(i: Long): Array[Byte] = { val bb = java.nio.ByteBuffer.allocate(8) bb.putLong(i) bb.array } def test = { val sql = spark.sqlContext import sql.implicits._ val timestamp = 1498772083037L val data = (timestamp to timestamp + 1000L).map(i => TestRecord(convertToBytes(i))) val testDF = sc.parallelize(data).toDF val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp) && col("col0") < convertToBytes(timestamp + 50L)) val filter2 = testDF.filter(col("col0") >= convertToBytes(timestamp + 50L) && col("col0") < convertToBytes(timestamp + 100L)) val filter3 = testDF.filter(col("col0") >= convertToBytes(timestamp) && col("col0") < convertToBytes(timestamp + 100L)) assert(filter1.count == 50) assert(filter2.count == 50) assert(filter3.count == 100) } {code} > BinaryType comparison does signed byte array comparison > --- > > Key: SPARK-21344 > URL: https://issues.apache.org/jira/browse/SPARK-21344 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1 >Reporter: Shubham Chopra > > BinaryType used by Spark SQL defines ordering using signed byte comparisons. > This can lead to unexpected behavior. Consider the following code snippet > that shows this error: > {code} > case class TestRecord(col0: Array[Byte]) > def convertToBytes(i: Long): Array[Byte] = { > val bb = java.nio.ByteBuffer.allocate(8) > bb.putLong(i) > bb.array > } > def test = { > val sql = spark.sqlContext > import sql.implicits._ > val timestamp = 1498772083037L > val data = (timestamp to timestamp + 1000L).map(i => > TestRecord(convertToBytes(i))) > val testDF = sc.parallelize(data).toDF > val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp) && > col("col0") < convertToBytes(timestamp + 50L)) > val filter2 = testDF.filter(col("col0") >= convertToBytes(timestamp + > 50L) && col("col0") < convertToBytes(timestamp + 100L)) > val filter3 = testDF.filter(col("col0") >= convertToBytes(timestamp) && > col("col0") < convertToBytes(timestamp + 100L)) > assert(filter1.count == 50) > assert(filter2.count == 50) > assert(filter3.count == 100) > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-15352) Topology aware block replication
[ https://issues.apache.org/jira/browse/SPARK-15352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shubham Chopra resolved SPARK-15352. Resolution: Fixed > Topology aware block replication > > > Key: SPARK-15352 > URL: https://issues.apache.org/jira/browse/SPARK-15352 > Project: Spark > Issue Type: New Feature > Components: Block Manager, Mesos, Spark Core, YARN >Reporter: Shubham Chopra >Assignee: Shubham Chopra > > With cached RDDs, Spark can be used for online analytics where it is used to > respond to online queries. But loss of RDD partitions due to node/executor > failures can cause huge delays in such use cases as the data would have to be > regenerated. > Cached RDDs, even when using multiple replicas per block, are not currently > resilient to node failures when multiple executors are started on the same > node. Block replication currently chooses a peer at random, and this peer > could also exist on the same host. > This effort would add topology aware replication to Spark that can be enabled > with pluggable strategies. For ease of development/review, this is being > broken down to three major work-efforts: > 1.Making peer selection for replication pluggable > 2.Providing pluggable implementations for providing topology and topology > aware replication > 3.Pro-active replenishment of lost blocks -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20902) Word2Vec implementations with Negative Sampling
[ https://issues.apache.org/jira/browse/SPARK-20902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shubham Chopra updated SPARK-20902: --- Description: Spark MLlib Word2Vec currently only implements Skip-Gram+Hierarchical softmax. Both Continuous bag of words (CBOW) and SkipGram have shown comparative or better performance with Negative Sampling. This umbrella JIRA is to keep a track of the effort to add negative sampling based implementations of both CBOW and SkipGram models to Spark MLlib. Since word2vec is largely a pre-processing step, the performance often can depend on the application it is being used for, and the corpus it is estimated on. These implementation give users the choice of picking one that works best for their use-case. was:Spark MLlib Word2Vec currently only implements Skip-Gram+Hierarchical softmax. Both Continuous bag of words (CBOW) and SkipGram have shown comparative or better performance with Negative Sampling. This umbrella JIRA is to keep a track of the effort to add negative sampling based implementations of both CBOW and SkipGram models to Spark MLlib. > Word2Vec implementations with Negative Sampling > --- > > Key: SPARK-20902 > URL: https://issues.apache.org/jira/browse/SPARK-20902 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib >Affects Versions: 2.1.1 >Reporter: Shubham Chopra > Labels: ML > > Spark MLlib Word2Vec currently only implements Skip-Gram+Hierarchical > softmax. Both Continuous bag of words (CBOW) and SkipGram have shown > comparative or better performance with Negative Sampling. This umbrella JIRA > is to keep a track of the effort to add negative sampling based > implementations of both CBOW and SkipGram models to Spark MLlib. > Since word2vec is largely a pre-processing step, the performance often can > depend on the application it is being used for, and the corpus it is > estimated on. These implementation give users the choice of picking one that > works best for their use-case. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20903) Word2Vec Skip-Gram + Negative Sampling
Shubham Chopra created SPARK-20903: -- Summary: Word2Vec Skip-Gram + Negative Sampling Key: SPARK-20903 URL: https://issues.apache.org/jira/browse/SPARK-20903 Project: Spark Issue Type: Sub-task Components: ML, MLlib Affects Versions: 2.1.1 Reporter: Shubham Chopra SkipGram + Negative Sampling is shown to be comparative or out-performing the hierarchical softmax based approach currently implemented with Spark. Since word2vec is largely a pre-processing step, the performance often can depend on the application it is being used for, and the corpus it is estimated on. These implementation give users the choice of picking one that works best for their use-case. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20372) Word2Vec Continuous Bag Of Words model
[ https://issues.apache.org/jira/browse/SPARK-20372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shubham Chopra updated SPARK-20372: --- Issue Type: Sub-task (was: Improvement) Parent: SPARK-20902 > Word2Vec Continuous Bag Of Words model > -- > > Key: SPARK-20372 > URL: https://issues.apache.org/jira/browse/SPARK-20372 > Project: Spark > Issue Type: Sub-task > Components: ML >Affects Versions: 2.1.0 >Reporter: Shubham Chopra > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20902) Word2Vec implementations with Negative Sampling
Shubham Chopra created SPARK-20902: -- Summary: Word2Vec implementations with Negative Sampling Key: SPARK-20902 URL: https://issues.apache.org/jira/browse/SPARK-20902 Project: Spark Issue Type: Improvement Components: ML, MLlib Affects Versions: 2.1.1 Reporter: Shubham Chopra Spark MLlib Word2Vec currently only implements Skip-Gram+Hierarchical softmax. Both Continuous bag of words (CBOW) and SkipGram have shown comparative or better performance with Negative Sampling. This umbrella JIRA is to keep a track of the effort to add negative sampling based implementations of both CBOW and SkipGram models to Spark MLlib. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20372) Word2Vec Continuous Bag Of Words model
Shubham Chopra created SPARK-20372: -- Summary: Word2Vec Continuous Bag Of Words model Key: SPARK-20372 URL: https://issues.apache.org/jira/browse/SPARK-20372 Project: Spark Issue Type: Improvement Components: ML Affects Versions: 2.1.0 Reporter: Shubham Chopra Fix For: 2.2.0 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19803) Flaky BlockManagerProactiveReplicationSuite tests
[ https://issues.apache.org/jira/browse/SPARK-19803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943460#comment-15943460 ] Shubham Chopra commented on SPARK-19803: The PR enforces a refresh of the peer list cached at the executor that is trying to proactively replicate the block. This fix ensures that the peer will never try to replicate to a previously failed executor due to a stale reference. In addition, in the unit test, the block managers are explicitly stopped when they are being removed from the master. > Flaky BlockManagerProactiveReplicationSuite tests > - > > Key: SPARK-19803 > URL: https://issues.apache.org/jira/browse/SPARK-19803 > Project: Spark > Issue Type: Bug > Components: Spark Core, Tests >Affects Versions: 2.2.0 >Reporter: Sital Kedia >Assignee: Shubham Chopra > Labels: flaky-test > Fix For: 2.2.0 > > > The tests added for BlockManagerProactiveReplicationSuite has made the > jenkins build flaky. Please refer to the build for more details - > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73640/testReport/ -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19803) Flaky BlockManagerProactiveReplicationSuite tests
[ https://issues.apache.org/jira/browse/SPARK-19803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938676#comment-15938676 ] Shubham Chopra commented on SPARK-19803: Any feedback on the PR - https://github.com/apache/spark/pull/17325 ? > Flaky BlockManagerProactiveReplicationSuite tests > - > > Key: SPARK-19803 > URL: https://issues.apache.org/jira/browse/SPARK-19803 > Project: Spark > Issue Type: Bug > Components: Spark Core, Tests >Affects Versions: 2.2.0 >Reporter: Sital Kedia >Assignee: Shubham Chopra > Labels: flaky-test > Fix For: 2.2.0 > > > The tests added for BlockManagerProactiveReplicationSuite has made the > jenkins build flaky. Please refer to the build for more details - > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73640/testReport/ -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-16878) YARN - topology information
[ https://issues.apache.org/jira/browse/SPARK-16878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shubham Chopra closed SPARK-16878. -- Resolution: Unresolved Withdrawing issue. As a stop-gap, a file based topology mapper can be used where needed. > YARN - topology information > > > Key: SPARK-16878 > URL: https://issues.apache.org/jira/browse/SPARK-16878 > Project: Spark > Issue Type: Sub-task > Components: YARN >Reporter: Shubham Chopra > > Block replication strategies need topology information for ideal block > placements. This information is available in resource managers and/or can be > provided separately through scripts/services/classes, as in the case of HDFS. > This jira focuses on enhancing spark-yarn package to suitably extract > topology information from YARN. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19803) Flaky BlockManagerProactiveReplicationSuite tests
[ https://issues.apache.org/jira/browse/SPARK-19803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927396#comment-15927396 ] Shubham Chopra commented on SPARK-19803: I am looking into this and will try to submit a fix in a day or so. Mostly trying to isolate the race condition and simplify the test cases. > Flaky BlockManagerProactiveReplicationSuite tests > - > > Key: SPARK-19803 > URL: https://issues.apache.org/jira/browse/SPARK-19803 > Project: Spark > Issue Type: Bug > Components: Spark Core, Tests >Affects Versions: 2.2.0 >Reporter: Sital Kedia >Assignee: Shubham Chopra > Labels: flaky-test > Fix For: 2.2.0 > > > The tests added for BlockManagerProactiveReplicationSuite has made the > jenkins build flaky. Please refer to the build for more details - > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73640/testReport/ -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15354) Topology aware block replication strategies
[ https://issues.apache.org/jira/browse/SPARK-15354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411903#comment-15411903 ] Shubham Chopra commented on SPARK-15354: This is a part of the larger umbrella jira [SPARK-15352|https://issues.apache.org/jira/browse/SPARK-15352]. We envision using Spark to respond to online queries at near real-time, with data cached in RDDs/DataFrames. In case of failures, where blocks of data backing an RDD/DataFrame are lost, regenerating data at "query time" would result in a significant hit on query performance. The lineage will ensure there is no data-loss, but we also need to ensure query performance doesn't degrade. The cached data, therefore, needs to be fail-safe to some extent. This problem is very similar to HDFS replication which results in a higher read traffic and high availability. When run on Yarn, Spark executors are housed inside Yarn containers and there can be multiple containers on the same node. With random block placement, if both replicas of a block end up on the same node, you are susceptible to node loss. An HDFS like approach, would be a very basic way of addressing this. Note that the hosts chosen through this would also be random. Depending on the replication objectives to be met depending on different topologies, a more general approach would work better. > Topology aware block replication strategies > --- > > Key: SPARK-15354 > URL: https://issues.apache.org/jira/browse/SPARK-15354 > Project: Spark > Issue Type: Sub-task > Components: Mesos, Spark Core, YARN >Reporter: Shubham Chopra > > Implementations of strategies for resilient block replication for different > resource managers that replicate the 3-replica strategy used by HDFS, where > the first replica is on an executor, the second replica within the same rack > as the executor and a third replica on a different rack. > The implementation involves providing two pluggable classes, one running in > the driver that provides topology information for every host at cluster start > and the second prioritizing a list of peer BlockManagerIds. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16878) YARN - topology information
Shubham Chopra created SPARK-16878: -- Summary: YARN - topology information Key: SPARK-16878 URL: https://issues.apache.org/jira/browse/SPARK-16878 Project: Spark Issue Type: Sub-task Components: YARN Reporter: Shubham Chopra Block replication strategies need topology information for ideal block placements. This information is available in resource managers and/or can be provided separately through scripts/services/classes, as in the case of HDFS. This jira focuses on enhancing spark-yarn package to suitably extract topology information from YARN. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16550) Caching data with replication doesn't replicate data
[ https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15392680#comment-15392680 ] Shubham Chopra commented on SPARK-16550: [~rxin] I did some basic testing using the patch, and it shows the desired replication. Thanks [~ekhliang]! The patch, however, does fail some unit tests in DistributedSuite. > Caching data with replication doesn't replicate data > > > Key: SPARK-16550 > URL: https://issues.apache.org/jira/browse/SPARK-16550 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core >Affects Versions: 2.0.0 >Reporter: Shubham Chopra >Assignee: Josh Rosen > > Caching multiple replicas of blocks is currently broken. The following > examples show replication doesn't happen for various use-cases: > These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode > {noformat} > case class TestInteger(i: Int) > val data = sc.parallelize((1 to 1000).map(TestInteger(_)), > 10).persist(MEMORY_ONLY_2) > data.count > {noformat} > sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows > only 10 blocks as opposed to the expected 20 > Block replication fails on the executors with a java.lang.RuntimeException: > java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger > {noformat} > val data1 = sc.parallelize(1 to 1000, > 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2) > data1.count > Block replication again fails with the following errors: > 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking > RpcHandler#receive() on RPC id 8567643992794608648 > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 13994 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753) > {noformat} > sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum > again shows 10 blocks > Caching serialized data works for native types, but not for custom classes > {noformat} > val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2) > data3.count > {noformat} > works as intended. > But > {noformat} > val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), > 10).persist(MEMORY_ONLY_SER_2) > data4.count > {noformat} > Again doesn't replicate data and executors show the same > ClassNotFoundException > These examples worked fine and showed expected results with Spark 1.6.2 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16550) Caching data with replication doesn't replicate data
[ https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shubham Chopra updated SPARK-16550: --- Priority: Blocker (was: Major) > Caching data with replication doesn't replicate data > > > Key: SPARK-16550 > URL: https://issues.apache.org/jira/browse/SPARK-16550 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core >Affects Versions: 2.0.0 >Reporter: Shubham Chopra >Priority: Blocker > > Caching multiple replicas of blocks is currently broken. The following > examples show replication doesn't happen for various use-cases: > These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode > case class TestInteger(i: Int) > val data = sc.parallelize((1 to 1000).map(TestInteger(_)), > 10).persist(MEMORY_ONLY_2) > data.count > sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows > only 10 blocks as opposed to the expected 20 > Block replication fails on the executors with a java.lang.RuntimeException: > java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger > val data1 = sc.parallelize(1 to 1000, > 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2) > data1.count > Block replication again fails with the following errors: > 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking > RpcHandler#receive() on RPC id 8567643992794608648 > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 13994 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753) > sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum > again shows 10 blocks > Caching serialized data works for native types, but not for custom classes > val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2) > data3.count > works as intended. > But > val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), > 10).persist(MEMORY_ONLY_SER_2) > data4.count > Again doesn't replicate data and executors show the same > ClassNotFoundException > These examples worked fine and showed expected results with Spark 1.6.2 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-16550) Caching data with replication doesn't replicate data
[ https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378478#comment-15378478 ] Shubham Chopra edited comment on SPARK-16550 at 7/14/16 10:08 PM: -- Example code: {noformat} case class TestInteger(i: Int) object TestApp { def main(args: Array[String]) { val conf = (new SparkConf).setAppName("Test app").setMaster("yarn-client") val sc = new SparkContext(conf) val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2) println(data.count) println(s"Total number of blocks in data: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(data.id).size).sum}") val dataTestInt = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(StorageLevel.MEMORY_ONLY_2) println(dataTestInt.count) println(s"Total number of blocks in dataTestInt: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestInt.id).size).sum}") val dataInteger = sc.parallelize((1 to 1000).map(new Integer(_)), 10).persist(StorageLevel.MEMORY_ONLY_2) println(dataInteger.count) println(s"Total number of blocks in dataInteger: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataInteger.id).size).sum}") val dataSerialized = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2) println(dataSerialized.count) println(s"Total number of blocks in dataSerialized: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataSerialized.id).size).sum}") val dataTestIntSer = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(StorageLevel.MEMORY_ONLY_SER_2) println(dataTestIntSer.count) println(s"Total number of blocks in dataTestIntSer: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestIntSer.id).size).sum}") } } {noformat} Output: {quote} 1000 Total number of blocks in data: 10 1000 Total number of blocks in dataTestInt: 10 1000 Total number of blocks in dataInteger: 20 1000 Total number of blocks in dataSerialized: 20 1000 Total number of blocks in dataTestIntSer: 10 {quote} The issue exists when I submit a compiled program as well. The exception stack traces are similar to the ones posted above. I think part of the problem might be related to [SPARK-13990|https://issues.apache.org/jira/browse/SPARK-13990] This code works fine in Spark 1.6.2, both in the shell and when submitted as compiled code. was (Author: shubhamc): Example code: {quote} case class TestInteger(i: Int) object TestApp { def main(args: Array[String]) { val conf = (new SparkConf).setAppName("Test app").setMaster("yarn-client") val sc = new SparkContext(conf) val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2) println(data.count) println(s"Total number of blocks in data: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(data.id).size).sum}") val dataTestInt = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(StorageLevel.MEMORY_ONLY_2) println(dataTestInt.count) println(s"Total number of blocks in dataTestInt: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestInt.id).size).sum}") val dataInteger = sc.parallelize((1 to 1000).map(new Integer(_)), 10).persist(StorageLevel.MEMORY_ONLY_2) println(dataInteger.count) println(s"Total number of blocks in dataInteger: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataInteger.id).size).sum}") val dataSerialized = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2) println(dataSerialized.count) println(s"Total number of blocks in dataSerialized: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataSerialized.id).size).sum}") val dataTestIntSer = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(StorageLevel.MEMORY_ONLY_SER_2) println(dataTestIntSer.count) println(s"Total number of blocks in dataTestIntSer: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestIntSer.id).size).sum}") } } {quote} Output: {quote} 1000 Total number of blocks in data: 10 1000 Total number of blocks in dataTestInt: 10 1000 Total number of blocks in dataInteger: 20 1000 Total number of blocks in dataSerialized: 20 1000 Total number of blocks in dataTestIntSer: 10 {quote} The issue exists when I submit a compiled program as well. The exception stack traces are similar to the ones posted above. I think part of the problem might be related to https://issues.apache.org/jira/browse/SPARK-13990 This code works fine in Spark 1.6.2, both in the shell and when submitted as compiled code. > Caching data with replication doesn't replicate data > > > Key: SPARK-16550 > URL: https://issues.apache.org/jira/browse/SPARK-16550 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core >Affects Versions:
[jira] [Comment Edited] (SPARK-16550) Caching data with replication doesn't replicate data
[ https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378478#comment-15378478 ] Shubham Chopra edited comment on SPARK-16550 at 7/14/16 10:05 PM: -- Example code: {quote} case class TestInteger(i: Int) object TestApp { def main(args: Array[String]) { val conf = (new SparkConf).setAppName("Test app").setMaster("yarn-client") val sc = new SparkContext(conf) val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2) println(data.count) println(s"Total number of blocks in data: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(data.id).size).sum}") val dataTestInt = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(StorageLevel.MEMORY_ONLY_2) println(dataTestInt.count) println(s"Total number of blocks in dataTestInt: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestInt.id).size).sum}") val dataInteger = sc.parallelize((1 to 1000).map(new Integer(_)), 10).persist(StorageLevel.MEMORY_ONLY_2) println(dataInteger.count) println(s"Total number of blocks in dataInteger: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataInteger.id).size).sum}") val dataSerialized = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2) println(dataSerialized.count) println(s"Total number of blocks in dataSerialized: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataSerialized.id).size).sum}") val dataTestIntSer = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(StorageLevel.MEMORY_ONLY_SER_2) println(dataTestIntSer.count) println(s"Total number of blocks in dataTestIntSer: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestIntSer.id).size).sum}") } } {quote} Output: {quote} 1000 Total number of blocks in data: 10 1000 Total number of blocks in dataTestInt: 10 1000 Total number of blocks in dataInteger: 20 1000 Total number of blocks in dataSerialized: 20 1000 Total number of blocks in dataTestIntSer: 10 {quote} The issue exists when I submit a compiled program as well. The exception stack traces are similar to the ones posted above. I think part of the problem might be related to https://issues.apache.org/jira/browse/SPARK-13990 This code works fine in Spark 1.6.2, both in the shell and when submitted as compiled code. was (Author: shubhamc): Example code: ``` case class TestInteger(i: Int) object TestApp { def main(args: Array[String]) { val conf = (new SparkConf).setAppName("Test app").setMaster("yarn-client") val sc = new SparkContext(conf) val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2) println(data.count) println(s"Total number of blocks in data: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(data.id).size).sum}") val dataTestInt = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(StorageLevel.MEMORY_ONLY_2) println(dataTestInt.count) println(s"Total number of blocks in dataTestInt: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestInt.id).size).sum}") val dataInteger = sc.parallelize((1 to 1000).map(new Integer(_)), 10).persist(StorageLevel.MEMORY_ONLY_2) println(dataInteger.count) println(s"Total number of blocks in dataInteger: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataInteger.id).size).sum}") val dataSerialized = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2) println(dataSerialized.count) println(s"Total number of blocks in dataSerialized: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataSerialized.id).size).sum}") val dataTestIntSer = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(StorageLevel.MEMORY_ONLY_SER_2) println(dataTestIntSer.count) println(s"Total number of blocks in dataTestIntSer: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestIntSer.id).size).sum}") } } ``` Output: 1000 Total number of blocks in data: 10 1000 Total number of blocks in dataTestInt: 10 1000 Total number of blocks in dataInteger: 20 1000 Total number of blocks in dataSerialized: 20 1000 Total number of blocks in dataTestIntSer: 10 The issue exists when I submit a compiled program as well. The exception stack traces are similar to the ones posted above. I think part of the problem might be related to https://issues.apache.org/jira/browse/SPARK-13990 This code works fine in Spark 1.6.2, both in the shell and when submitted as compiled code. > Caching data with replication doesn't replicate data > > > Key: SPARK-16550 > URL: https://issues.apache.org/jira/browse/SPARK-16550 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core >Affects Versions: 2.0.0 >Reporter: Shubham Chopra >
[jira] [Commented] (SPARK-16550) Caching data with replication doesn't replicate data
[ https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378478#comment-15378478 ] Shubham Chopra commented on SPARK-16550: Example code: ``` case class TestInteger(i: Int) object TestApp { def main(args: Array[String]) { val conf = (new SparkConf).setAppName("Test app").setMaster("yarn-client") val sc = new SparkContext(conf) val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2) println(data.count) println(s"Total number of blocks in data: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(data.id).size).sum}") val dataTestInt = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(StorageLevel.MEMORY_ONLY_2) println(dataTestInt.count) println(s"Total number of blocks in dataTestInt: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestInt.id).size).sum}") val dataInteger = sc.parallelize((1 to 1000).map(new Integer(_)), 10).persist(StorageLevel.MEMORY_ONLY_2) println(dataInteger.count) println(s"Total number of blocks in dataInteger: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataInteger.id).size).sum}") val dataSerialized = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2) println(dataSerialized.count) println(s"Total number of blocks in dataSerialized: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataSerialized.id).size).sum}") val dataTestIntSer = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(StorageLevel.MEMORY_ONLY_SER_2) println(dataTestIntSer.count) println(s"Total number of blocks in dataTestIntSer: ${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestIntSer.id).size).sum}") } } ``` Output: 1000 Total number of blocks in data: 10 1000 Total number of blocks in dataTestInt: 10 1000 Total number of blocks in dataInteger: 20 1000 Total number of blocks in dataSerialized: 20 1000 Total number of blocks in dataTestIntSer: 10 The issue exists when I submit a compiled program as well. The exception stack traces are similar to the ones posted above. I think part of the problem might be related to https://issues.apache.org/jira/browse/SPARK-13990 This code works fine in Spark 1.6.2, both in the shell and when submitted as compiled code. > Caching data with replication doesn't replicate data > > > Key: SPARK-16550 > URL: https://issues.apache.org/jira/browse/SPARK-16550 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core >Affects Versions: 2.0.0 >Reporter: Shubham Chopra > > Caching multiple replicas of blocks is currently broken. The following > examples show replication doesn't happen for various use-cases: > These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode > case class TestInteger(i: Int) > val data = sc.parallelize((1 to 1000).map(TestInteger(_)), > 10).persist(MEMORY_ONLY_2) > data.count > sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows > only 10 blocks as opposed to the expected 20 > Block replication fails on the executors with a java.lang.RuntimeException: > java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger > val data1 = sc.parallelize(1 to 1000, > 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2) > data1.count > Block replication again fails with the following errors: > 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking > RpcHandler#receive() on RPC id 8567643992794608648 > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 13994 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753) > sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum > again shows 10 blocks > Caching serialized data works for native types, but not for custom classes > val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2) > data3.count > works as intended. > But > val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), >
[jira] [Created] (SPARK-16550) Caching data with replication doesn't replicate data
Shubham Chopra created SPARK-16550: -- Summary: Caching data with replication doesn't replicate data Key: SPARK-16550 URL: https://issues.apache.org/jira/browse/SPARK-16550 Project: Spark Issue Type: Bug Components: Block Manager, Spark Core Affects Versions: 2.0.0 Reporter: Shubham Chopra Priority: Blocker Caching multiple replicas of blocks is currently broken. The following examples show replication doesn't happen for various use-cases: These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode case class TestInteger(i: Int) val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(MEMORY_ONLY_2) data.count sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows only 10 blocks as opposed to the expected 20 Block replication fails on the executors with a java.lang.RuntimeException: java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger val data1 = sc.parallelize(1 to 1000, 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2) data1.count Block replication again fails with the following errors: 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 8567643992794608648 com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213) at org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775) at org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753) sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum again shows 10 blocks Caching serialized data works for native types, but not for custom classes val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2) data3.count works as intended. But val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(MEMORY_ONLY_SER_2) data4.count Again doesn't replicate data and executors show the same ClassNotFoundException These examples worked fine and showed expected results with Spark 1.6.2 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15355) Pro-active block replenishment in case of node/executor failures
Shubham Chopra created SPARK-15355: -- Summary: Pro-active block replenishment in case of node/executor failures Key: SPARK-15355 URL: https://issues.apache.org/jira/browse/SPARK-15355 Project: Spark Issue Type: Sub-task Components: Block Manager, Spark Core Reporter: Shubham Chopra Spark currently does not replenish lost replicas. For resiliency and high availability, BlockManagerMasterEndpoint can proactively verify whether all cached RDDs have enough replicas, and replenish them, in case they don’t. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15354) Topology aware block replication strategies
Shubham Chopra created SPARK-15354: -- Summary: Topology aware block replication strategies Key: SPARK-15354 URL: https://issues.apache.org/jira/browse/SPARK-15354 Project: Spark Issue Type: Sub-task Components: Mesos, Spark Core, YARN Reporter: Shubham Chopra Implementations of strategies for resilient block replication for different resource managers that replicate the 3-replica strategy used by HDFS, where the first replica is on an executor, the second replica within the same rack as the executor and a third replica on a different rack. The implementation involves providing two pluggable classes, one running in the driver that provides topology information for every host at cluster start and the second prioritizing a list of peer BlockManagerIds. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-15353) Making peer selection for block replication pluggable
[ https://issues.apache.org/jira/browse/SPARK-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shubham Chopra updated SPARK-15353: --- Attachment: BlockManagerSequenceDiagram.png Sequence diagram explaining the various calls between BlockManager and BlockManagerMasterEndpoint for topology aware block replication > Making peer selection for block replication pluggable > - > > Key: SPARK-15353 > URL: https://issues.apache.org/jira/browse/SPARK-15353 > Project: Spark > Issue Type: Sub-task > Components: Block Manager, Spark Core >Reporter: Shubham Chopra > Attachments: BlockManagerSequenceDiagram.png > > > BlockManagers running on executors provide all logistics around block > management. Before a BlockManager can be used, it has to be “initialized”. As > a part of the initialization, BlockManager asks the > BlockManagerMasterEndpoint to give it topology information. The > BlockManagerMasterEndpoint is provided a pluggable interface that can be used > to resolve a hostname to topology. This information is used to decorate the > BlockManagerId. This happens at cluster start and whenever a new executor is > added. > During replication, the BlockManager gets the list of all its peers in the > form of a Seq[BlockManagerId]. We add a pluggable prioritizer that can be > used to prioritize this list of peers based on topology information. Peers > with higher priority occur first in the sequence and the BlockManager tries > to replicate blocks in that order. > There would be default implementations for these pluggable interfaces that > replicate the existing behavior of randomly choosing a peer. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15353) Making peer selection for block replication pluggable
Shubham Chopra created SPARK-15353: -- Summary: Making peer selection for block replication pluggable Key: SPARK-15353 URL: https://issues.apache.org/jira/browse/SPARK-15353 Project: Spark Issue Type: Sub-task Components: Block Manager, Spark Core Reporter: Shubham Chopra BlockManagers running on executors provide all logistics around block management. Before a BlockManager can be used, it has to be “initialized”. As a part of the initialization, BlockManager asks the BlockManagerMasterEndpoint to give it topology information. The BlockManagerMasterEndpoint is provided a pluggable interface that can be used to resolve a hostname to topology. This information is used to decorate the BlockManagerId. This happens at cluster start and whenever a new executor is added. During replication, the BlockManager gets the list of all its peers in the form of a Seq[BlockManagerId]. We add a pluggable prioritizer that can be used to prioritize this list of peers based on topology information. Peers with higher priority occur first in the sequence and the BlockManager tries to replicate blocks in that order. There would be default implementations for these pluggable interfaces that replicate the existing behavior of randomly choosing a peer. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15352) Topology aware block replication
Shubham Chopra created SPARK-15352: -- Summary: Topology aware block replication Key: SPARK-15352 URL: https://issues.apache.org/jira/browse/SPARK-15352 Project: Spark Issue Type: New Feature Components: Block Manager, Mesos, Spark Core, YARN Reporter: Shubham Chopra With cached RDDs, Spark can be used for online analytics where it is used to respond to online queries. But loss of RDD partitions due to node/executor failures can cause huge delays in such use cases as the data would have to be regenerated. Cached RDDs, even when using multiple replicas per block, are not currently resilient to node failures when multiple executors are started on the same node. Block replication currently chooses a peer at random, and this peer could also exist on the same host. This effort would add topology aware replication to Spark that can be enabled with pluggable strategies. For ease of development/review, this is being broken down to three major work-efforts: 1. Making peer selection for replication pluggable 2. Providing pluggable implementations for providing topology and topology aware replication 3. Pro-active replenishment of lost blocks -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org