[jira] [Updated] (SPARK-21344) BinaryType comparison does signed byte array comparison

2017-07-07 Thread Shubham Chopra (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shubham Chopra updated SPARK-21344:
---
Priority: Blocker  (was: Major)

> BinaryType comparison does signed byte array comparison
> ---
>
> Key: SPARK-21344
> URL: https://issues.apache.org/jira/browse/SPARK-21344
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Shubham Chopra
>Priority: Blocker
>
> BinaryType used by Spark SQL defines ordering using signed byte comparisons. 
> This can lead to unexpected behavior. Consider the following code snippet 
> that shows this error:
> {code}
> case class TestRecord(col0: Array[Byte])
> def convertToBytes(i: Long): Array[Byte] = {
> val bb = java.nio.ByteBuffer.allocate(8)
> bb.putLong(i)
> bb.array
>   }
> def test = {
> val sql = spark.sqlContext
> import sql.implicits._
> val timestamp = 1498772083037L
> val data = (timestamp to timestamp + 1000L).map(i => 
> TestRecord(convertToBytes(i)))
> val testDF = sc.parallelize(data).toDF
> val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp) && 
> col("col0") < convertToBytes(timestamp + 50L))
> val filter2 = testDF.filter(col("col0") >= convertToBytes(timestamp + 
> 50L) && col("col0") < convertToBytes(timestamp + 100L))
> val filter3 = testDF.filter(col("col0") >= convertToBytes(timestamp) && 
> col("col0") < convertToBytes(timestamp + 100L))
> assert(filter1.count == 50)
> assert(filter2.count == 50)
> assert(filter3.count == 100)
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21344) BinaryType comparison does signed byte array comparison

2017-07-07 Thread Shubham Chopra (JIRA)
Shubham Chopra created SPARK-21344:
--

 Summary: BinaryType comparison does signed byte array comparison
 Key: SPARK-21344
 URL: https://issues.apache.org/jira/browse/SPARK-21344
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.1.1
Reporter: Shubham Chopra


BinaryType used by Spark SQL defines ordering using signed byte comparisons. 
This can lead to unexpected behavior. Consider the following code snippet that 
shows this error:

{code:scala}
case class TestRecord(col0: Array[Byte])
def convertToBytes(i: Long): Array[Byte] = {
val bb = java.nio.ByteBuffer.allocate(8)
bb.putLong(i)
bb.array
  }
def test = {
val sql = spark.sqlContext
import sql.implicits._
val timestamp = 1498772083037L
val data = (timestamp to timestamp + 1000L).map(i => 
TestRecord(convertToBytes(i)))
val testDF = sc.parallelize(data).toDF
val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp) && 
col("col0") < convertToBytes(timestamp + 50L))
val filter2 = testDF.filter(col("col0") >= convertToBytes(timestamp + 50L) 
&& col("col0") < convertToBytes(timestamp + 100L))
val filter3 = testDF.filter(col("col0") >= convertToBytes(timestamp) && 
col("col0") < convertToBytes(timestamp + 100L))
assert(filter1.count == 50)
assert(filter2.count == 50)
assert(filter3.count == 100)
}
{code}






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21344) BinaryType comparison does signed byte array comparison

2017-07-07 Thread Shubham Chopra (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shubham Chopra updated SPARK-21344:
---
Description: 
BinaryType used by Spark SQL defines ordering using signed byte comparisons. 
This can lead to unexpected behavior. Consider the following code snippet that 
shows this error:

{code}
case class TestRecord(col0: Array[Byte])
def convertToBytes(i: Long): Array[Byte] = {
val bb = java.nio.ByteBuffer.allocate(8)
bb.putLong(i)
bb.array
  }
def test = {
val sql = spark.sqlContext
import sql.implicits._
val timestamp = 1498772083037L
val data = (timestamp to timestamp + 1000L).map(i => 
TestRecord(convertToBytes(i)))
val testDF = sc.parallelize(data).toDF
val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp) && 
col("col0") < convertToBytes(timestamp + 50L))
val filter2 = testDF.filter(col("col0") >= convertToBytes(timestamp + 50L) 
&& col("col0") < convertToBytes(timestamp + 100L))
val filter3 = testDF.filter(col("col0") >= convertToBytes(timestamp) && 
col("col0") < convertToBytes(timestamp + 100L))
assert(filter1.count == 50)
assert(filter2.count == 50)
assert(filter3.count == 100)
}
{code}




  was:
BinaryType used by Spark SQL defines ordering using signed byte comparisons. 
This can lead to unexpected behavior. Consider the following code snippet that 
shows this error:

{code:scala}
case class TestRecord(col0: Array[Byte])
def convertToBytes(i: Long): Array[Byte] = {
val bb = java.nio.ByteBuffer.allocate(8)
bb.putLong(i)
bb.array
  }
def test = {
val sql = spark.sqlContext
import sql.implicits._
val timestamp = 1498772083037L
val data = (timestamp to timestamp + 1000L).map(i => 
TestRecord(convertToBytes(i)))
val testDF = sc.parallelize(data).toDF
val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp) && 
col("col0") < convertToBytes(timestamp + 50L))
val filter2 = testDF.filter(col("col0") >= convertToBytes(timestamp + 50L) 
&& col("col0") < convertToBytes(timestamp + 100L))
val filter3 = testDF.filter(col("col0") >= convertToBytes(timestamp) && 
col("col0") < convertToBytes(timestamp + 100L))
assert(filter1.count == 50)
assert(filter2.count == 50)
assert(filter3.count == 100)
}
{code}





> BinaryType comparison does signed byte array comparison
> ---
>
> Key: SPARK-21344
> URL: https://issues.apache.org/jira/browse/SPARK-21344
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Shubham Chopra
>
> BinaryType used by Spark SQL defines ordering using signed byte comparisons. 
> This can lead to unexpected behavior. Consider the following code snippet 
> that shows this error:
> {code}
> case class TestRecord(col0: Array[Byte])
> def convertToBytes(i: Long): Array[Byte] = {
> val bb = java.nio.ByteBuffer.allocate(8)
> bb.putLong(i)
> bb.array
>   }
> def test = {
> val sql = spark.sqlContext
> import sql.implicits._
> val timestamp = 1498772083037L
> val data = (timestamp to timestamp + 1000L).map(i => 
> TestRecord(convertToBytes(i)))
> val testDF = sc.parallelize(data).toDF
> val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp) && 
> col("col0") < convertToBytes(timestamp + 50L))
> val filter2 = testDF.filter(col("col0") >= convertToBytes(timestamp + 
> 50L) && col("col0") < convertToBytes(timestamp + 100L))
> val filter3 = testDF.filter(col("col0") >= convertToBytes(timestamp) && 
> col("col0") < convertToBytes(timestamp + 100L))
> assert(filter1.count == 50)
> assert(filter2.count == 50)
> assert(filter3.count == 100)
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-15352) Topology aware block replication

2017-06-02 Thread Shubham Chopra (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shubham Chopra resolved SPARK-15352.

Resolution: Fixed

> Topology aware block replication
> 
>
> Key: SPARK-15352
> URL: https://issues.apache.org/jira/browse/SPARK-15352
> Project: Spark
>  Issue Type: New Feature
>  Components: Block Manager, Mesos, Spark Core, YARN
>Reporter: Shubham Chopra
>Assignee: Shubham Chopra
>
> With cached RDDs, Spark can be used for online analytics where it is used to 
> respond to online queries. But loss of RDD partitions due to node/executor 
> failures can cause huge delays in such use cases as the data would have to be 
> regenerated.
> Cached RDDs, even when using multiple replicas per block, are not currently 
> resilient to node failures when multiple executors are started on the same 
> node. Block replication currently chooses a peer at random, and this peer 
> could also exist on the same host. 
> This effort would add topology aware replication to Spark that can be enabled 
> with pluggable strategies. For ease of development/review, this is being 
> broken down to three major work-efforts:
> 1.Making peer selection for replication pluggable
> 2.Providing pluggable implementations for providing topology and topology 
> aware replication
> 3.Pro-active replenishment of lost blocks



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20902) Word2Vec implementations with Negative Sampling

2017-05-26 Thread Shubham Chopra (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shubham Chopra updated SPARK-20902:
---
Description: 
Spark MLlib Word2Vec currently only implements Skip-Gram+Hierarchical softmax. 
Both Continuous bag of words (CBOW) and SkipGram have shown comparative or 
better performance with Negative Sampling. This umbrella JIRA is to keep a 
track of the effort to add negative sampling based implementations of both CBOW 
and SkipGram models to Spark MLlib.

Since word2vec is largely a pre-processing step, the performance often can 
depend on the application it is being used for, and the corpus it is estimated 
on. These implementation give users the choice of picking one that works best 
for their use-case.

  was:Spark MLlib Word2Vec currently only implements Skip-Gram+Hierarchical 
softmax. Both Continuous bag of words (CBOW) and SkipGram have shown 
comparative or better performance with Negative Sampling. This umbrella JIRA is 
to keep a track of the effort to add negative sampling based implementations of 
both CBOW and SkipGram models to Spark MLlib.


> Word2Vec implementations with Negative Sampling
> ---
>
> Key: SPARK-20902
> URL: https://issues.apache.org/jira/browse/SPARK-20902
> Project: Spark
>  Issue Type: Improvement
>  Components: ML, MLlib
>Affects Versions: 2.1.1
>Reporter: Shubham Chopra
>  Labels: ML
>
> Spark MLlib Word2Vec currently only implements Skip-Gram+Hierarchical 
> softmax. Both Continuous bag of words (CBOW) and SkipGram have shown 
> comparative or better performance with Negative Sampling. This umbrella JIRA 
> is to keep a track of the effort to add negative sampling based 
> implementations of both CBOW and SkipGram models to Spark MLlib.
> Since word2vec is largely a pre-processing step, the performance often can 
> depend on the application it is being used for, and the corpus it is 
> estimated on. These implementation give users the choice of picking one that 
> works best for their use-case.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20903) Word2Vec Skip-Gram + Negative Sampling

2017-05-26 Thread Shubham Chopra (JIRA)
Shubham Chopra created SPARK-20903:
--

 Summary: Word2Vec Skip-Gram + Negative Sampling
 Key: SPARK-20903
 URL: https://issues.apache.org/jira/browse/SPARK-20903
 Project: Spark
  Issue Type: Sub-task
  Components: ML, MLlib
Affects Versions: 2.1.1
Reporter: Shubham Chopra


SkipGram + Negative Sampling is shown to be comparative or out-performing the 
hierarchical softmax based approach currently implemented with Spark. Since 
word2vec is largely a pre-processing step, the performance often can depend on 
the application it is being used for, and the corpus it is estimated on. These 
implementation give users the choice of picking one that works best for their 
use-case.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20372) Word2Vec Continuous Bag Of Words model

2017-05-26 Thread Shubham Chopra (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shubham Chopra updated SPARK-20372:
---
Issue Type: Sub-task  (was: Improvement)
Parent: SPARK-20902

> Word2Vec Continuous Bag Of Words model
> --
>
> Key: SPARK-20372
> URL: https://issues.apache.org/jira/browse/SPARK-20372
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 2.1.0
>Reporter: Shubham Chopra
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20902) Word2Vec implementations with Negative Sampling

2017-05-26 Thread Shubham Chopra (JIRA)
Shubham Chopra created SPARK-20902:
--

 Summary: Word2Vec implementations with Negative Sampling
 Key: SPARK-20902
 URL: https://issues.apache.org/jira/browse/SPARK-20902
 Project: Spark
  Issue Type: Improvement
  Components: ML, MLlib
Affects Versions: 2.1.1
Reporter: Shubham Chopra


Spark MLlib Word2Vec currently only implements Skip-Gram+Hierarchical softmax. 
Both Continuous bag of words (CBOW) and SkipGram have shown comparative or 
better performance with Negative Sampling. This umbrella JIRA is to keep a 
track of the effort to add negative sampling based implementations of both CBOW 
and SkipGram models to Spark MLlib.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20372) Word2Vec Continuous Bag Of Words model

2017-04-18 Thread Shubham Chopra (JIRA)
Shubham Chopra created SPARK-20372:
--

 Summary: Word2Vec Continuous Bag Of Words model
 Key: SPARK-20372
 URL: https://issues.apache.org/jira/browse/SPARK-20372
 Project: Spark
  Issue Type: Improvement
  Components: ML
Affects Versions: 2.1.0
Reporter: Shubham Chopra
 Fix For: 2.2.0






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19803) Flaky BlockManagerProactiveReplicationSuite tests

2017-03-27 Thread Shubham Chopra (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943460#comment-15943460
 ] 

Shubham Chopra commented on SPARK-19803:


The PR enforces a refresh of the peer list cached at the executor that is 
trying to proactively replicate the block. This fix ensures that the peer will 
never try to replicate to a previously failed executor due to a stale 
reference. In addition, in the unit test, the block managers are explicitly 
stopped when they are being removed from the master.

> Flaky BlockManagerProactiveReplicationSuite tests
> -
>
> Key: SPARK-19803
> URL: https://issues.apache.org/jira/browse/SPARK-19803
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Tests
>Affects Versions: 2.2.0
>Reporter: Sital Kedia
>Assignee: Shubham Chopra
>  Labels: flaky-test
> Fix For: 2.2.0
>
>
> The tests added for BlockManagerProactiveReplicationSuite has made the 
> jenkins build flaky. Please refer to the build for more details - 
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73640/testReport/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19803) Flaky BlockManagerProactiveReplicationSuite tests

2017-03-23 Thread Shubham Chopra (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938676#comment-15938676
 ] 

Shubham Chopra commented on SPARK-19803:


Any feedback on the PR - https://github.com/apache/spark/pull/17325 ? 

> Flaky BlockManagerProactiveReplicationSuite tests
> -
>
> Key: SPARK-19803
> URL: https://issues.apache.org/jira/browse/SPARK-19803
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Tests
>Affects Versions: 2.2.0
>Reporter: Sital Kedia
>Assignee: Shubham Chopra
>  Labels: flaky-test
> Fix For: 2.2.0
>
>
> The tests added for BlockManagerProactiveReplicationSuite has made the 
> jenkins build flaky. Please refer to the build for more details - 
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73640/testReport/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-16878) YARN - topology information

2017-03-17 Thread Shubham Chopra (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shubham Chopra closed SPARK-16878.
--
Resolution: Unresolved

Withdrawing issue. As a stop-gap, a file based topology mapper can be used 
where needed. 

> YARN - topology information 
> 
>
> Key: SPARK-16878
> URL: https://issues.apache.org/jira/browse/SPARK-16878
> Project: Spark
>  Issue Type: Sub-task
>  Components: YARN
>Reporter: Shubham Chopra
>
> Block replication strategies need topology information for ideal block 
> placements. This information is available in resource managers and/or can be 
> provided separately through scripts/services/classes, as in the case of HDFS. 
> This jira focuses on enhancing spark-yarn package to suitably extract 
> topology information from YARN. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19803) Flaky BlockManagerProactiveReplicationSuite tests

2017-03-15 Thread Shubham Chopra (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927396#comment-15927396
 ] 

Shubham Chopra commented on SPARK-19803:


I am looking into this and will try to submit a fix in a day or so. Mostly 
trying to isolate the race condition and simplify the test cases. 

> Flaky BlockManagerProactiveReplicationSuite tests
> -
>
> Key: SPARK-19803
> URL: https://issues.apache.org/jira/browse/SPARK-19803
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Tests
>Affects Versions: 2.2.0
>Reporter: Sital Kedia
>Assignee: Shubham Chopra
>  Labels: flaky-test
> Fix For: 2.2.0
>
>
> The tests added for BlockManagerProactiveReplicationSuite has made the 
> jenkins build flaky. Please refer to the build for more details - 
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73640/testReport/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15354) Topology aware block replication strategies

2016-08-08 Thread Shubham Chopra (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411903#comment-15411903
 ] 

Shubham Chopra commented on SPARK-15354:


This is a part of the larger umbrella jira 
[SPARK-15352|https://issues.apache.org/jira/browse/SPARK-15352]. We envision 
using Spark to respond to online queries at near real-time, with data cached in 
RDDs/DataFrames. In case of failures, where blocks of data backing an 
RDD/DataFrame are lost, regenerating data at "query time" would result in a 
significant hit on query performance. The lineage will ensure there is no 
data-loss, but we also need to ensure query performance doesn't degrade. 

The cached data, therefore, needs to be fail-safe to some extent. This problem 
is very similar to HDFS replication which results in a higher read traffic and 
high availability. 

When run on Yarn, Spark executors are housed inside Yarn containers and there 
can be multiple containers on the same node. With random block placement, if 
both replicas of a block end up on the same node, you are susceptible to node 
loss. An HDFS like approach, would be a very basic way of addressing this. Note 
that the hosts chosen through this would also be random. Depending on the 
replication objectives to be met depending on different topologies, a more 
general approach would work better.

> Topology aware block replication strategies
> ---
>
> Key: SPARK-15354
> URL: https://issues.apache.org/jira/browse/SPARK-15354
> Project: Spark
>  Issue Type: Sub-task
>  Components: Mesos, Spark Core, YARN
>Reporter: Shubham Chopra
>
> Implementations of strategies for resilient block replication for different 
> resource managers that replicate the 3-replica strategy used by HDFS, where 
> the first replica is on an executor, the second replica within the same rack 
> as the executor and a third replica on a different rack. 
> The implementation involves providing two pluggable classes, one running in 
> the driver that provides topology information for every host at cluster start 
> and the second prioritizing a list of peer BlockManagerIds. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16878) YARN - topology information

2016-08-03 Thread Shubham Chopra (JIRA)
Shubham Chopra created SPARK-16878:
--

 Summary: YARN - topology information 
 Key: SPARK-16878
 URL: https://issues.apache.org/jira/browse/SPARK-16878
 Project: Spark
  Issue Type: Sub-task
  Components: YARN
Reporter: Shubham Chopra


Block replication strategies need topology information for ideal block 
placements. This information is available in resource managers and/or can be 
provided separately through scripts/services/classes, as in the case of HDFS. 
This jira focuses on enhancing spark-yarn package to suitably extract topology 
information from YARN. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16550) Caching data with replication doesn't replicate data

2016-07-25 Thread Shubham Chopra (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15392680#comment-15392680
 ] 

Shubham Chopra commented on SPARK-16550:


[~rxin] I did some basic testing using the patch, and it shows the desired 
replication. Thanks [~ekhliang]!

The patch, however, does fail some unit tests in DistributedSuite.

> Caching data with replication doesn't replicate data
> 
>
> Key: SPARK-16550
> URL: https://issues.apache.org/jira/browse/SPARK-16550
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.0
>Reporter: Shubham Chopra
>Assignee: Josh Rosen
>
> Caching multiple replicas of blocks is currently broken. The following 
> examples show replication doesn't happen for various use-cases:
> These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode
> {noformat}
> case class TestInteger(i: Int)
> val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_2)
> data.count
> {noformat}
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows 
> only 10 blocks as opposed to the expected 20
> Block replication fails on the executors with a java.lang.RuntimeException: 
> java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger
> {noformat}
> val data1 = sc.parallelize(1 to 1000, 
> 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
> data1.count
> Block replication again fails with the following errors:
> 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking 
> RpcHandler#receive() on RPC id 8567643992794608648
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 13994
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
> {noformat}
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum 
> again shows 10 blocks
> Caching serialized data works for native types, but not for custom classes
> {noformat}
> val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
> data3.count
> {noformat}
> works as intended.
> But 
> {noformat}
> val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_SER_2)
> data4.count
> {noformat}
> Again doesn't replicate data and executors show the same 
> ClassNotFoundException
> These examples worked fine and showed expected results with Spark 1.6.2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16550) Caching data with replication doesn't replicate data

2016-07-14 Thread Shubham Chopra (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shubham Chopra updated SPARK-16550:
---
Priority: Blocker  (was: Major)

> Caching data with replication doesn't replicate data
> 
>
> Key: SPARK-16550
> URL: https://issues.apache.org/jira/browse/SPARK-16550
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.0
>Reporter: Shubham Chopra
>Priority: Blocker
>
> Caching multiple replicas of blocks is currently broken. The following 
> examples show replication doesn't happen for various use-cases:
> These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode
> case class TestInteger(i: Int)
> val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_2)
> data.count
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows 
> only 10 blocks as opposed to the expected 20
> Block replication fails on the executors with a java.lang.RuntimeException: 
> java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger
> val data1 = sc.parallelize(1 to 1000, 
> 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
> data1.count
> Block replication again fails with the following errors:
> 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking 
> RpcHandler#receive() on RPC id 8567643992794608648
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 13994
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum 
> again shows 10 blocks
> Caching serialized data works for native types, but not for custom classes
> val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
> data3.count
> works as intended.
> But 
> val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_SER_2)
> data4.count
> Again doesn't replicate data and executors show the same 
> ClassNotFoundException
> These examples worked fine and showed expected results with Spark 1.6.2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-16550) Caching data with replication doesn't replicate data

2016-07-14 Thread Shubham Chopra (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378478#comment-15378478
 ] 

Shubham Chopra edited comment on SPARK-16550 at 7/14/16 10:08 PM:
--

Example code: 
{noformat}
case class TestInteger(i: Int)

object TestApp {
  def main(args: Array[String]) {
val conf = (new SparkConf).setAppName("Test app").setMaster("yarn-client")
val sc = new SparkContext(conf)

val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
println(data.count)
println(s"Total number of blocks in data: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(data.id).size).sum}")

val dataTestInt = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataTestInt.count)
println(s"Total number of blocks in dataTestInt: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestInt.id).size).sum}")

val dataInteger = sc.parallelize((1 to 1000).map(new Integer(_)), 
10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataInteger.count)
println(s"Total number of blocks in dataInteger: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataInteger.id).size).sum}")

val dataSerialized = sc.parallelize(1 to 1000, 
10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataSerialized.count)
println(s"Total number of blocks in dataSerialized: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataSerialized.id).size).sum}")

val dataTestIntSer = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataTestIntSer.count)
println(s"Total number of blocks in dataTestIntSer: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestIntSer.id).size).sum}")

  }
}
{noformat}

Output:
{quote}
1000
Total number of blocks in data: 10
1000
Total number of blocks in dataTestInt: 10
1000
Total number of blocks in dataInteger: 20
1000
Total number of blocks in dataSerialized: 20
1000
Total number of blocks in dataTestIntSer: 10
{quote}

The issue exists when I submit a compiled program as well. The exception stack 
traces are similar to the ones posted above.

I think part of the problem might be related to 
[SPARK-13990|https://issues.apache.org/jira/browse/SPARK-13990]

This code works fine in Spark 1.6.2, both in the shell and when submitted as 
compiled code.


was (Author: shubhamc):
Example code: 
{quote}
case class TestInteger(i: Int)

object TestApp {
  def main(args: Array[String]) {
val conf = (new SparkConf).setAppName("Test app").setMaster("yarn-client")
val sc = new SparkContext(conf)

val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
println(data.count)
println(s"Total number of blocks in data: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(data.id).size).sum}")

val dataTestInt = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataTestInt.count)
println(s"Total number of blocks in dataTestInt: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestInt.id).size).sum}")

val dataInteger = sc.parallelize((1 to 1000).map(new Integer(_)), 
10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataInteger.count)
println(s"Total number of blocks in dataInteger: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataInteger.id).size).sum}")

val dataSerialized = sc.parallelize(1 to 1000, 
10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataSerialized.count)
println(s"Total number of blocks in dataSerialized: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataSerialized.id).size).sum}")

val dataTestIntSer = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataTestIntSer.count)
println(s"Total number of blocks in dataTestIntSer: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestIntSer.id).size).sum}")

  }
}
{quote}

Output:
{quote}
1000
Total number of blocks in data: 10
1000
Total number of blocks in dataTestInt: 10
1000
Total number of blocks in dataInteger: 20
1000
Total number of blocks in dataSerialized: 20
1000
Total number of blocks in dataTestIntSer: 10
{quote}

The issue exists when I submit a compiled program as well. The exception stack 
traces are similar to the ones posted above.

I think part of the problem might be related to 
https://issues.apache.org/jira/browse/SPARK-13990

This code works fine in Spark 1.6.2, both in the shell and when submitted as 
compiled code.

> Caching data with replication doesn't replicate data
> 
>
> Key: SPARK-16550
> URL: https://issues.apache.org/jira/browse/SPARK-16550
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 

[jira] [Comment Edited] (SPARK-16550) Caching data with replication doesn't replicate data

2016-07-14 Thread Shubham Chopra (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378478#comment-15378478
 ] 

Shubham Chopra edited comment on SPARK-16550 at 7/14/16 10:05 PM:
--

Example code: 
{quote}
case class TestInteger(i: Int)

object TestApp {
  def main(args: Array[String]) {
val conf = (new SparkConf).setAppName("Test app").setMaster("yarn-client")
val sc = new SparkContext(conf)

val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
println(data.count)
println(s"Total number of blocks in data: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(data.id).size).sum}")

val dataTestInt = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataTestInt.count)
println(s"Total number of blocks in dataTestInt: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestInt.id).size).sum}")

val dataInteger = sc.parallelize((1 to 1000).map(new Integer(_)), 
10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataInteger.count)
println(s"Total number of blocks in dataInteger: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataInteger.id).size).sum}")

val dataSerialized = sc.parallelize(1 to 1000, 
10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataSerialized.count)
println(s"Total number of blocks in dataSerialized: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataSerialized.id).size).sum}")

val dataTestIntSer = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataTestIntSer.count)
println(s"Total number of blocks in dataTestIntSer: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestIntSer.id).size).sum}")

  }
}
{quote}

Output:
{quote}
1000
Total number of blocks in data: 10
1000
Total number of blocks in dataTestInt: 10
1000
Total number of blocks in dataInteger: 20
1000
Total number of blocks in dataSerialized: 20
1000
Total number of blocks in dataTestIntSer: 10
{quote}

The issue exists when I submit a compiled program as well. The exception stack 
traces are similar to the ones posted above.

I think part of the problem might be related to 
https://issues.apache.org/jira/browse/SPARK-13990

This code works fine in Spark 1.6.2, both in the shell and when submitted as 
compiled code.


was (Author: shubhamc):
Example code: 
```
case class TestInteger(i: Int)

object TestApp {
  def main(args: Array[String]) {
val conf = (new SparkConf).setAppName("Test app").setMaster("yarn-client")
val sc = new SparkContext(conf)

val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
println(data.count)
println(s"Total number of blocks in data: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(data.id).size).sum}")

val dataTestInt = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataTestInt.count)
println(s"Total number of blocks in dataTestInt: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestInt.id).size).sum}")

val dataInteger = sc.parallelize((1 to 1000).map(new Integer(_)), 
10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataInteger.count)
println(s"Total number of blocks in dataInteger: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataInteger.id).size).sum}")

val dataSerialized = sc.parallelize(1 to 1000, 
10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataSerialized.count)
println(s"Total number of blocks in dataSerialized: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataSerialized.id).size).sum}")

val dataTestIntSer = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataTestIntSer.count)
println(s"Total number of blocks in dataTestIntSer: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestIntSer.id).size).sum}")

  }
}
```

Output:
1000
Total number of blocks in data: 10
1000
Total number of blocks in dataTestInt: 10
1000
Total number of blocks in dataInteger: 20
1000
Total number of blocks in dataSerialized: 20
1000
Total number of blocks in dataTestIntSer: 10

The issue exists when I submit a compiled program as well. The exception stack 
traces are similar to the ones posted above.

I think part of the problem might be related to 
https://issues.apache.org/jira/browse/SPARK-13990

This code works fine in Spark 1.6.2, both in the shell and when submitted as 
compiled code.

> Caching data with replication doesn't replicate data
> 
>
> Key: SPARK-16550
> URL: https://issues.apache.org/jira/browse/SPARK-16550
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.0
>Reporter: Shubham Chopra
>

[jira] [Commented] (SPARK-16550) Caching data with replication doesn't replicate data

2016-07-14 Thread Shubham Chopra (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378478#comment-15378478
 ] 

Shubham Chopra commented on SPARK-16550:


Example code: 
```
case class TestInteger(i: Int)

object TestApp {
  def main(args: Array[String]) {
val conf = (new SparkConf).setAppName("Test app").setMaster("yarn-client")
val sc = new SparkContext(conf)

val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
println(data.count)
println(s"Total number of blocks in data: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(data.id).size).sum}")

val dataTestInt = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataTestInt.count)
println(s"Total number of blocks in dataTestInt: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestInt.id).size).sum}")

val dataInteger = sc.parallelize((1 to 1000).map(new Integer(_)), 
10).persist(StorageLevel.MEMORY_ONLY_2)
println(dataInteger.count)
println(s"Total number of blocks in dataInteger: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataInteger.id).size).sum}")

val dataSerialized = sc.parallelize(1 to 1000, 
10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataSerialized.count)
println(s"Total number of blocks in dataSerialized: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataSerialized.id).size).sum}")

val dataTestIntSer = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(StorageLevel.MEMORY_ONLY_SER_2)
println(dataTestIntSer.count)
println(s"Total number of blocks in dataTestIntSer: 
${sc.getExecutorStorageStatus.map(_.rddBlocksById(dataTestIntSer.id).size).sum}")

  }
}
```

Output:
1000
Total number of blocks in data: 10
1000
Total number of blocks in dataTestInt: 10
1000
Total number of blocks in dataInteger: 20
1000
Total number of blocks in dataSerialized: 20
1000
Total number of blocks in dataTestIntSer: 10

The issue exists when I submit a compiled program as well. The exception stack 
traces are similar to the ones posted above.

I think part of the problem might be related to 
https://issues.apache.org/jira/browse/SPARK-13990

This code works fine in Spark 1.6.2, both in the shell and when submitted as 
compiled code.

> Caching data with replication doesn't replicate data
> 
>
> Key: SPARK-16550
> URL: https://issues.apache.org/jira/browse/SPARK-16550
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.0
>Reporter: Shubham Chopra
>
> Caching multiple replicas of blocks is currently broken. The following 
> examples show replication doesn't happen for various use-cases:
> These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode
> case class TestInteger(i: Int)
> val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 10).persist(MEMORY_ONLY_2)
> data.count
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows 
> only 10 blocks as opposed to the expected 20
> Block replication fails on the executors with a java.lang.RuntimeException: 
> java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger
> val data1 = sc.parallelize(1 to 1000, 
> 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
> data1.count
> Block replication again fails with the following errors:
> 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking 
> RpcHandler#receive() on RPC id 8567643992794608648
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 13994
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>   at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>   at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
> sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum 
> again shows 10 blocks
> Caching serialized data works for native types, but not for custom classes
> val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
> data3.count
> works as intended.
> But 
> val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 
> 

[jira] [Created] (SPARK-16550) Caching data with replication doesn't replicate data

2016-07-14 Thread Shubham Chopra (JIRA)
Shubham Chopra created SPARK-16550:
--

 Summary: Caching data with replication doesn't replicate data
 Key: SPARK-16550
 URL: https://issues.apache.org/jira/browse/SPARK-16550
 Project: Spark
  Issue Type: Bug
  Components: Block Manager, Spark Core
Affects Versions: 2.0.0
Reporter: Shubham Chopra
Priority: Blocker


Caching multiple replicas of blocks is currently broken. The following examples 
show replication doesn't happen for various use-cases:

These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode

case class TestInteger(i: Int)
val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(MEMORY_ONLY_2)
data.count
sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows 
only 10 blocks as opposed to the expected 20
Block replication fails on the executors with a java.lang.RuntimeException: 
java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger

val data1 = sc.parallelize(1 to 1000, 
10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
data1.count

Block replication again fails with the following errors:
16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking 
RpcHandler#receive() on RPC id 8567643992794608648
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
13994
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)

sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum again 
shows 10 blocks

Caching serialized data works for native types, but not for custom classes

val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
data3.count

works as intended.

But 
val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 
10).persist(MEMORY_ONLY_SER_2)
data4.count

Again doesn't replicate data and executors show the same ClassNotFoundException

These examples worked fine and showed expected results with Spark 1.6.2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15355) Pro-active block replenishment in case of node/executor failures

2016-05-16 Thread Shubham Chopra (JIRA)
Shubham Chopra created SPARK-15355:
--

 Summary: Pro-active block replenishment in case of node/executor 
failures
 Key: SPARK-15355
 URL: https://issues.apache.org/jira/browse/SPARK-15355
 Project: Spark
  Issue Type: Sub-task
  Components: Block Manager, Spark Core
Reporter: Shubham Chopra


Spark currently does not replenish lost replicas. For resiliency and high 
availability, BlockManagerMasterEndpoint can proactively verify whether all 
cached RDDs have enough replicas, and replenish them, in case they don’t.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15354) Topology aware block replication strategies

2016-05-16 Thread Shubham Chopra (JIRA)
Shubham Chopra created SPARK-15354:
--

 Summary: Topology aware block replication strategies
 Key: SPARK-15354
 URL: https://issues.apache.org/jira/browse/SPARK-15354
 Project: Spark
  Issue Type: Sub-task
  Components: Mesos, Spark Core, YARN
Reporter: Shubham Chopra


Implementations of strategies for resilient block replication for different 
resource managers that replicate the 3-replica strategy used by HDFS, where the 
first replica is on an executor, the second replica within the same rack as the 
executor and a third replica on a different rack. 
The implementation involves providing two pluggable classes, one running in the 
driver that provides topology information for every host at cluster start and 
the second prioritizing a list of peer BlockManagerIds. 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15353) Making peer selection for block replication pluggable

2016-05-16 Thread Shubham Chopra (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shubham Chopra updated SPARK-15353:
---
Attachment: BlockManagerSequenceDiagram.png

Sequence diagram explaining the various calls between BlockManager and 
BlockManagerMasterEndpoint for topology aware block replication

> Making peer selection for block replication pluggable
> -
>
> Key: SPARK-15353
> URL: https://issues.apache.org/jira/browse/SPARK-15353
> Project: Spark
>  Issue Type: Sub-task
>  Components: Block Manager, Spark Core
>Reporter: Shubham Chopra
> Attachments: BlockManagerSequenceDiagram.png
>
>
> BlockManagers running on executors provide all logistics around block 
> management. Before a BlockManager can be used, it has to be “initialized”. As 
> a part of the initialization, BlockManager asks the 
> BlockManagerMasterEndpoint to give it topology information. The 
> BlockManagerMasterEndpoint is provided a pluggable interface that can be used 
> to resolve a hostname to topology. This information is used to decorate the 
> BlockManagerId. This happens at cluster start and whenever a new executor is 
> added.
> During replication, the BlockManager gets the list of all its peers in the 
> form of a Seq[BlockManagerId]. We add a pluggable prioritizer that can be 
> used to prioritize this list of peers based on topology information. Peers 
> with higher priority occur first in the sequence and the BlockManager tries 
> to replicate blocks in that order.
> There would be default implementations for these pluggable interfaces that 
> replicate the existing behavior of randomly choosing a peer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15353) Making peer selection for block replication pluggable

2016-05-16 Thread Shubham Chopra (JIRA)
Shubham Chopra created SPARK-15353:
--

 Summary: Making peer selection for block replication pluggable
 Key: SPARK-15353
 URL: https://issues.apache.org/jira/browse/SPARK-15353
 Project: Spark
  Issue Type: Sub-task
  Components: Block Manager, Spark Core
Reporter: Shubham Chopra


BlockManagers running on executors provide all logistics around block 
management. Before a BlockManager can be used, it has to be “initialized”. As a 
part of the initialization, BlockManager asks the BlockManagerMasterEndpoint to 
give it topology information. The BlockManagerMasterEndpoint is provided a 
pluggable interface that can be used to resolve a hostname to topology. This 
information is used to decorate the BlockManagerId. This happens at cluster 
start and whenever a new executor is added.
During replication, the BlockManager gets the list of all its peers in the form 
of a Seq[BlockManagerId]. We add a pluggable prioritizer that can be used to 
prioritize this list of peers based on topology information. Peers with higher 
priority occur first in the sequence and the BlockManager tries to replicate 
blocks in that order.
There would be default implementations for these pluggable interfaces that 
replicate the existing behavior of randomly choosing a peer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15352) Topology aware block replication

2016-05-16 Thread Shubham Chopra (JIRA)
Shubham Chopra created SPARK-15352:
--

 Summary: Topology aware block replication
 Key: SPARK-15352
 URL: https://issues.apache.org/jira/browse/SPARK-15352
 Project: Spark
  Issue Type: New Feature
  Components: Block Manager, Mesos, Spark Core, YARN
Reporter: Shubham Chopra


With cached RDDs, Spark can be used for online analytics where it is used to 
respond to online queries. But loss of RDD partitions due to node/executor 
failures can cause huge delays in such use cases as the data would have to be 
regenerated.
Cached RDDs, even when using multiple replicas per block, are not currently 
resilient to node failures when multiple executors are started on the same 
node. Block replication currently chooses a peer at random, and this peer could 
also exist on the same host. 
This effort would add topology aware replication to Spark that can be enabled 
with pluggable strategies. For ease of development/review, this is being broken 
down to three major work-efforts:
1.  Making peer selection for replication pluggable
2.  Providing pluggable implementations for providing topology and topology 
aware replication
3.  Pro-active replenishment of lost blocks




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org