[jira] [Updated] (SPARK-13268) SQL Timestamp stored as GMT but toString returns GMT-08:00

2016-02-10 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-13268:
-
Description: 
There is an issue with how timestamps are displayed/converted to Strings in 
Spark SQL. The documentation states that the timestamp should be created in the 
GMT time zone, however, if we do so, we see that the output actually contains a 
-8 hour offset:

{code}
new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli)
res144: java.sql.Timestamp = 2014-12-31 16:00:00.0

new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli)
res145: java.sql.Timestamp = 2015-01-01 00:00:00.0
{code}

This result is confusing, unintuitive, and introduces issues when converting 
from DataFrames containing timestamps to RDDs which are then saved as text. 
This has the effect of essentially shifting all dates in a dataset by 1 day. 


  was:
There is an issue with how timestamps are displayed/converted to Strings in 
Spark SQL. The documentation states that the timestamp should be created in the 
GMT time zone, however, if we do so, we see that the output actually contains a 
-8 hour offset:

{{ 
new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli)
res144: java.sql.Timestamp = 2014-12-31 16:00:00.0

new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli)
res145: java.sql.Timestamp = 2015-01-01 00:00:00.0
}}

This result is confusing, unintuitive, and introduces issues when converting 
from DataFrames containing timestamps to RDDs which are then saved as text. 
This has the effect of essentially shifting all dates in a dataset by 1 day. 



> SQL Timestamp stored as GMT but toString returns GMT-08:00
> --
>
> Key: SPARK-13268
> URL: https://issues.apache.org/jira/browse/SPARK-13268
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Ilya Ganelin
>
> There is an issue with how timestamps are displayed/converted to Strings in 
> Spark SQL. The documentation states that the timestamp should be created in 
> the GMT time zone, however, if we do so, we see that the output actually 
> contains a -8 hour offset:
> {code}
> new 
> Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli)
> res144: java.sql.Timestamp = 2014-12-31 16:00:00.0
> new 
> Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli)
> res145: java.sql.Timestamp = 2015-01-01 00:00:00.0
> {code}
> This result is confusing, unintuitive, and introduces issues when converting 
> from DataFrames containing timestamps to RDDs which are then saved as text. 
> This has the effect of essentially shifting all dates in a dataset by 1 day. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-13268) SQL Timestamp stored as GMT but toString returns GMT-08:00

2016-02-10 Thread Ilya Ganelin (JIRA)
Ilya Ganelin created SPARK-13268:


 Summary: SQL Timestamp stored as GMT but toString returns GMT-08:00
 Key: SPARK-13268
 URL: https://issues.apache.org/jira/browse/SPARK-13268
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.0
Reporter: Ilya Ganelin


There is an issue with how timestamps are displayed/converted to Strings in 
Spark SQL. The documentation states that the timestamp should be created in the 
GMT time zone, however, if we do so, we see that the output actually contains a 
-8 hour offset:

{{ 
new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli)
res144: java.sql.Timestamp = 2014-12-31 16:00:00.0

new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli)
res145: java.sql.Timestamp = 2015-01-01 00:00:00.0
}}

This result is confusing, unintuitive, and introduces issues when converting 
from DataFrames containing timestamps to RDDs which are then saved as text. 
This has the effect of essentially shifting all dates in a dataset by 1 day. 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-13268) SQL Timestamp stored as GMT but toString returns GMT-08:00

2016-02-10 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-13268:
-
Description: 
There is an issue with how timestamps are displayed/converted to Strings in 
Spark SQL. The documentation states that the timestamp should be created in the 
GMT time zone, however, if we do so, we see that the output actually contains a 
-8 hour offset:

{code}
new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli)
res144: java.sql.Timestamp = 2014-12-31 16:00:00.0

new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli)
res145: java.sql.Timestamp = 2015-01-01 00:00:00.0
{code}

This result is confusing, unintuitive, and introduces issues when converting 
from DataFrames containing timestamps to RDDs which are then saved as text. 
This has the effect of essentially shifting all dates in a dataset by 1 day. 

The suggested fix for this is to update the timestamp toString representation 
to either a) Include timezone or b) Correctly display in GMT.

This change may well introduce substantial and insidious bugs so I'm not sure 
how best to resolve this.


  was:
There is an issue with how timestamps are displayed/converted to Strings in 
Spark SQL. The documentation states that the timestamp should be created in the 
GMT time zone, however, if we do so, we see that the output actually contains a 
-8 hour offset:

{code}
new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli)
res144: java.sql.Timestamp = 2014-12-31 16:00:00.0

new 
Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli)
res145: java.sql.Timestamp = 2015-01-01 00:00:00.0
{code}

This result is confusing, unintuitive, and introduces issues when converting 
from DataFrames containing timestamps to RDDs which are then saved as text. 
This has the effect of essentially shifting all dates in a dataset by 1 day. 



> SQL Timestamp stored as GMT but toString returns GMT-08:00
> --
>
> Key: SPARK-13268
> URL: https://issues.apache.org/jira/browse/SPARK-13268
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Ilya Ganelin
>
> There is an issue with how timestamps are displayed/converted to Strings in 
> Spark SQL. The documentation states that the timestamp should be created in 
> the GMT time zone, however, if we do so, we see that the output actually 
> contains a -8 hour offset:
> {code}
> new 
> Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT]").toInstant.toEpochMilli)
> res144: java.sql.Timestamp = 2014-12-31 16:00:00.0
> new 
> Timestamp(ZonedDateTime.parse("2015-01-01T00:00:00Z[GMT-08:00]").toInstant.toEpochMilli)
> res145: java.sql.Timestamp = 2015-01-01 00:00:00.0
> {code}
> This result is confusing, unintuitive, and introduces issues when converting 
> from DataFrames containing timestamps to RDDs which are then saved as text. 
> This has the effect of essentially shifting all dates in a dataset by 1 day. 
> The suggested fix for this is to update the timestamp toString representation 
> to either a) Include timezone or b) Correctly display in GMT.
> This change may well introduce substantial and insidious bugs so I'm not sure 
> how best to resolve this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs

2015-12-28 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15073236#comment-15073236
 ] 

Ilya Ganelin commented on SPARK-12488:
--

I'll submit a dataset that causes this when I have a moment. Thanks!



Thank you,
Ilya Ganelin





> LDA describeTopics() Generates Invalid Term IDs
> ---
>
> Key: SPARK-12488
> URL: https://issues.apache.org/jira/browse/SPARK-12488
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2
>Reporter: Ilya Ganelin
>
> When running the LDA model, and using the describeTopics function, invalid 
> values appear in the termID list that is returned:
> The below example generates 10 topics on a data set with a vocabulary of 685.
> {code}
> // Set LDA parameters
> val numTopics = 10
> val lda = new LDA().setK(numTopics).setMaxIterations(10)
> val ldaModel = lda.run(docTermVector)
> val distModel = 
> ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted.reverse
> res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
> 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
> 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
> 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
> 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
> 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
> 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 
> 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 
> 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 
> 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 
> 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53...
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted
> res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
> -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
> -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
> -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
> -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
> -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
> -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
> -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
> -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
> 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 
> 38, 39, 40, 41, 42, 43, 44, 45, 4...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs

2015-12-22 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-12488:
-
Description: 
When running the LDA model, and using the describeTopics function, invalid 
values appear in the termID list that is returned:

The below example generates 10 topics on a data set with a vocabulary of 685.

{code}

// Set LDA parameters
val numTopics = 10
val lda = new LDA().setK(numTopics).setMaxIterations(10)

val ldaModel = lda.run(docTermVector)
val distModel = 
ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
{code}

{code}
scala> ldaModel.describeTopics()(0)._1.sorted.reverse
res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 586, 
585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 571, 570, 
569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 556, 555, 554, 
553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 541, 540, 539, 538, 
537, 536, 535, 534, 533, 532, 53...
{code}

{code}
scala> ldaModel.describeTopics()(0)._1.sorted
res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
-1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
-1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
-1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
-1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
-1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
-1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
-563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
-26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 
39, 40, 41, 42, 43, 44, 45, 4...
{code}

  was:
When running the LDA model, and using the describeTopics function, invalid 
values appear in the termID list that is returned:

The below example generated 10 topics on a data set with a vocabulary of 685.

{code}

// Set LDA parameters
val numTopics = 10
val lda = new LDA().setK(numTopics).setMaxIterations(10)

val ldaModel = lda.run(docTermVector)
val distModel = 
ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
{code}

{code}
scala> ldaModel.describeTopics()(0)._1.sorted.reverse
res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 586, 
585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 571, 570, 
569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 556, 555, 554, 
553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 541, 540, 539, 538, 
537, 536, 535, 534, 533, 532, 53...
{code}

{code}
scala> ldaModel.describeTopics()(0)._1.sorted
res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
-1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
-1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
-1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
-1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
-1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
-1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
-563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
-26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 
39, 40, 41, 42, 43, 44, 45, 4...
{code}


> LDA describeTopics() Generates Invalid Term IDs
> ---
>
> Key: SPARK-12488
> URL: https://issues.apache.org/jira/browse/SPARK-12488
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2

[jira] [Updated] (SPARK-12488) LDA Describe Topics Generates Invalid Term IDs

2015-12-22 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-12488:
-
Description: 
When running the LDA model, and using the describeTopics function, invalid 
values appear in the termID list that is returned:

The below example generated 10 topics on a data set with a vocabulary of 685.

{code}

// Set LDA parameters
val numTopics = 10
val lda = new LDA().setK(numTopics).setMaxIterations(10)

val ldaModel = lda.run(docTermVector)
val distModel = 
ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
{code}

{code}
scala> ldaModel.describeTopics()(0)._1.sorted.reverse
res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 586, 
585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 571, 570, 
569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 556, 555, 554, 
553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 541, 540, 539, 538, 
537, 536, 535, 534, 533, 532, 53...
{code}

{code}
scala> ldaModel.describeTopics()(0)._1.sorted
res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
-1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
-1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
-1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
-1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
-1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
-1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
-563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
-26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 
39, 40, 41, 42, 43, 44, 45, 4...
{code}

  was:
When running the LDA model, and using the describeTopics function, invalid 
values appear in the termID list that is returned:

{code}

// Set LDA parameters
val numTopics = 10
val lda = new LDA().setK(numTopics).setMaxIterations(10)

val ldaModel = lda.run(docTermVector)
val distModel = 
ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
{code}


> LDA Describe Topics Generates Invalid Term IDs
> --
>
> Key: SPARK-12488
> URL: https://issues.apache.org/jira/browse/SPARK-12488
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2
>Reporter: Ilya Ganelin
>
> When running the LDA model, and using the describeTopics function, invalid 
> values appear in the termID list that is returned:
> The below example generated 10 topics on a data set with a vocabulary of 685.
> {code}
> // Set LDA parameters
> val numTopics = 10
> val lda = new LDA().setK(numTopics).setMaxIterations(10)
> val ldaModel = lda.run(docTermVector)
> val distModel = 
> ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted.reverse
> res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
> 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
> 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
> 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
> 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
> 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
> 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 
> 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 
> 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 
> 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 
> 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53...
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted
> res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
> -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
> -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
> -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
> -1397991169, 

[jira] [Commented] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs

2015-12-22 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15068761#comment-15068761
 ] 

Ilya Ganelin commented on SPARK-12488:
--

@jkbradley Would love your feedback here. Thanks!

> LDA describeTopics() Generates Invalid Term IDs
> ---
>
> Key: SPARK-12488
> URL: https://issues.apache.org/jira/browse/SPARK-12488
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2
>Reporter: Ilya Ganelin
>
> When running the LDA model, and using the describeTopics function, invalid 
> values appear in the termID list that is returned:
> The below example generates 10 topics on a data set with a vocabulary of 685.
> {code}
> // Set LDA parameters
> val numTopics = 10
> val lda = new LDA().setK(numTopics).setMaxIterations(10)
> val ldaModel = lda.run(docTermVector)
> val distModel = 
> ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted.reverse
> res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
> 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
> 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
> 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
> 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
> 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
> 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 
> 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 
> 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 
> 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 
> 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53...
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted
> res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
> -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
> -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
> -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
> -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
> -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
> -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
> -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
> -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
> 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 
> 38, 39, 40, 41, 42, 43, 44, 45, 4...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs

2015-12-22 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-12488:
-
Summary: LDA describeTopics() Generates Invalid Term IDs  (was: LDA 
Describe Topics Generates Invalid Term IDs)

> LDA describeTopics() Generates Invalid Term IDs
> ---
>
> Key: SPARK-12488
> URL: https://issues.apache.org/jira/browse/SPARK-12488
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2
>Reporter: Ilya Ganelin
>
> When running the LDA model, and using the describeTopics function, invalid 
> values appear in the termID list that is returned:
> The below example generated 10 topics on a data set with a vocabulary of 685.
> {code}
> // Set LDA parameters
> val numTopics = 10
> val lda = new LDA().setK(numTopics).setMaxIterations(10)
> val ldaModel = lda.run(docTermVector)
> val distModel = 
> ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted.reverse
> res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
> 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
> 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
> 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
> 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
> 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
> 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 
> 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 
> 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 
> 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 
> 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53...
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted
> res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
> -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
> -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
> -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
> -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
> -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
> -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
> -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
> -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
> 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 
> 38, 39, 40, 41, 42, 43, 44, 45, 4...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-12488) LDA Describe Topics Generates Invalid Term IDs

2015-12-22 Thread Ilya Ganelin (JIRA)
Ilya Ganelin created SPARK-12488:


 Summary: LDA Describe Topics Generates Invalid Term IDs
 Key: SPARK-12488
 URL: https://issues.apache.org/jira/browse/SPARK-12488
 Project: Spark
  Issue Type: Bug
  Components: MLlib
Affects Versions: 1.5.2
Reporter: Ilya Ganelin


When running the LDA model, and using the describeTopics function, invalid 
values appear in the termID list that is returned:

{code}

// Set LDA parameters
val numTopics = 10
val lda = new LDA().setK(numTopics).setMaxIterations(10)

val ldaModel = lda.run(docTermVector)
val distModel = 
ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs

2015-12-22 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15068827#comment-15068827
 ] 

Ilya Ganelin commented on SPARK-12488:
--

Further investigation identifies the issue as stemming from the docTermVector 
containing zero-vectors (as in no words from the vocabulary present in the 
document).

> LDA describeTopics() Generates Invalid Term IDs
> ---
>
> Key: SPARK-12488
> URL: https://issues.apache.org/jira/browse/SPARK-12488
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2
>Reporter: Ilya Ganelin
>
> When running the LDA model, and using the describeTopics function, invalid 
> values appear in the termID list that is returned:
> The below example generates 10 topics on a data set with a vocabulary of 685.
> {code}
> // Set LDA parameters
> val numTopics = 10
> val lda = new LDA().setK(numTopics).setMaxIterations(10)
> val ldaModel = lda.run(docTermVector)
> val distModel = 
> ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted.reverse
> res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
> 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
> 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
> 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
> 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
> 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
> 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 
> 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 
> 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 
> 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 
> 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53...
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted
> res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
> -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
> -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
> -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
> -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
> -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
> -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
> -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
> -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
> 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 
> 38, 39, 40, 41, 42, 43, 44, 45, 4...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12488) LDA describeTopics() Generates Invalid Term IDs

2015-12-22 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15068761#comment-15068761
 ] 

Ilya Ganelin edited comment on SPARK-12488 at 12/22/15 9:32 PM:


[~josephkb] Would love your feedback here. Thanks!


was (Author: ilganeli):
@jkbradley Would love your feedback here. Thanks!

> LDA describeTopics() Generates Invalid Term IDs
> ---
>
> Key: SPARK-12488
> URL: https://issues.apache.org/jira/browse/SPARK-12488
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib
>Affects Versions: 1.5.2
>Reporter: Ilya Ganelin
>
> When running the LDA model, and using the describeTopics function, invalid 
> values appear in the termID list that is returned:
> The below example generates 10 topics on a data set with a vocabulary of 685.
> {code}
> // Set LDA parameters
> val numTopics = 10
> val lda = new LDA().setK(numTopics).setMaxIterations(10)
> val ldaModel = lda.run(docTermVector)
> val distModel = 
> ldaModel.asInstanceOf[org.apache.spark.mllib.clustering.DistributedLDAModel]
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted.reverse
> res40: Array[Int] = Array(2064860663, 2054149956, 1991041659, 1986948613, 
> 1962816105, 1858775243, 1842920256, 1799900935, 1792510791, 1792371944, 
> 1737877485, 1712816533, 1690397927, 1676379181, 1664181296, 1501782385, 
> 1274389076, 1260230987, 1226545007, 1213472080, 1068338788, 1050509279, 
> 714524034, 678227417, 678227086, 624763822, 624623852, 618552479, 616917682, 
> 551612860, 453929488, 371443786, 183302140, 58762039, 42599819, 9947563, 617, 
> 616, 615, 612, 603, 597, 596, 595, 594, 593, 592, 591, 590, 589, 588, 587, 
> 586, 585, 584, 583, 582, 581, 580, 579, 578, 577, 576, 575, 574, 573, 572, 
> 571, 570, 569, 568, 567, 566, 565, 564, 563, 562, 561, 560, 559, 558, 557, 
> 556, 555, 554, 553, 552, 551, 550, 549, 548, 547, 546, 545, 544, 543, 542, 
> 541, 540, 539, 538, 537, 536, 535, 534, 533, 532, 53...
> {code}
> {code}
> scala> ldaModel.describeTopics()(0)._1.sorted
> res41: Array[Int] = Array(-2087809139, -2001127319, -1979718998, -1833443915, 
> -1811530305, -1765302237, -1668096260, -1527422175, -1493838005, -1452770216, 
> -1452508395, -1452502074, -1452277147, -1451720206, -1450928740, -1450237612, 
> -1448730073, -1437852514, -1420883015, -1418557080, -1397997340, -1397995485, 
> -1397991169, -1374921919, -1360937376, -1360533511, -1320627329, -1314475604, 
> -1216400643, -1210734882, -1107065297, -1063529036, -1062984222, -1042985412, 
> -1009109620, -951707740, -894644371, -799531743, -627436045, -586317106, 
> -563544698, -326546674, -174108802, -155900771, -80887355, -78916591, 
> -26690004, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
> 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 
> 38, 39, 40, 41, 42, 43, 44, 45, 4...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628488#comment-14628488
 ] 

Ilya Ganelin commented on SPARK-8890:
-

[~rxin] I want to make sure I correctly understand your solution. Are you 
proposing that if the number of active partitions is beyond 50 we repartition 
the data into 50 partitions? 

I think we could approach this differently by creating a pool of OutputWriters 
(of size 50) and only create new OutputWriters once the previous partition has 
been written. This could be handled by blocking within the outputWriterForRow 
call when the new outputWriter is created. 

Does that seem reasonable? Please let me know, thanks!

 Reduce memory consumption for dynamic partition insert
 --

 Key: SPARK-8890
 URL: https://issues.apache.org/jira/browse/SPARK-8890
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Reynold Xin
Priority: Critical

 Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
 table partitions is large. The problem is that we open one output writer for 
 each partition, and when data are randomized and when the number of 
 partitions is large, we open a large number of output writers, leading to OOM.
 The solution here is to inject a sorting operation once the number of active 
 partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628533#comment-14628533
 ] 

Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 6:46 PM:
--

Once data is sorted, is the number of partitions guaranteed to be under that 
limit? When we're talking about sorting, are we talking about which columns are 
in which partition?

I want to make sure I understand what is happening. When we ingest a data 
frame, we consume a set of data organized by columns (the schema). When this 
data is partitioned, does all data under a certain column go to the same 
partition? If not, what happens in this stage?

We create a new outputWriter for each row based on the columns within that row 
(from the projected columns). New outputWriters become necessary when the 
columns within a row are different. However, given that the schema is fixed, 
where does this variability come from and what does it mean to sort in this 
context? 


was (Author: ilganeli):
Once data is sorted, is the number of partitions guaranteed to be under that 
limit? When we're talking about sorting, are we talking about which columns are 
in which partition?

I want to make sure I understand what is happening. When we ingest a data 
frame, we consume a set of data organized by columns (the schema). When this 
data is partitioned, does all data under a certain column go to the same 
partition? If not, what happens in this stage?

We create a new ```outputWriter``` for each row based on the columns within 
that row (from the projected columns). New ```outputWriters``` become necessary 
when the columns within a row are different. However, given that the schema is 
fixed, where does this variability come from and what does it mean to sort in 
this context? 

 Reduce memory consumption for dynamic partition insert
 --

 Key: SPARK-8890
 URL: https://issues.apache.org/jira/browse/SPARK-8890
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Reynold Xin
Priority: Critical

 Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
 table partitions is large. The problem is that we open one output writer for 
 each partition, and when data are randomized and when the number of 
 partitions is large, we open a large number of output writers, leading to OOM.
 The solution here is to inject a sorting operation once the number of active 
 partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628533#comment-14628533
 ] 

Ilya Ganelin commented on SPARK-8890:
-

Once data is sorted, is the number of partitions guaranteed to be under that 
limit? When we're talking about sorting, are we talking about which columns are 
in which partition?

I want to make sure I understand what is happening. When we ingest a data 
frame, we consume a set of data organized by columns (the schema). When this 
data is partitioned, does all data under a certain column go to the same 
partition? If not, what happens in this stage?

We create a new ```outputWriter``` for each row based on the columns within 
that row (from the projected columns). New ```outputWriters``` become necessary 
when the columns within a row are different. However, given that the schema is 
fixed, where does this variability come from and what does it mean to sort in 
this context? 

 Reduce memory consumption for dynamic partition insert
 --

 Key: SPARK-8890
 URL: https://issues.apache.org/jira/browse/SPARK-8890
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Reynold Xin
Priority: Critical

 Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
 table partitions is large. The problem is that we open one output writer for 
 each partition, and when data are randomized and when the number of 
 partitions is large, we open a large number of output writers, leading to OOM.
 The solution here is to inject a sorting operation once the number of active 
 partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628620#comment-14628620
 ] 

Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 7:55 PM:
--

[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of InternalRow 
objects in memory. 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 



was (Author: ilganeli):
[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of {InternalRow} 
objects in memory. 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 


 Reduce memory consumption for dynamic partition insert
 --

 Key: SPARK-8890
 URL: https://issues.apache.org/jira/browse/SPARK-8890
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Reynold Xin
Priority: Critical

 Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
 table partitions is large. The problem is that we open one output writer for 
 each partition, and when data are randomized and when the number of 
 partitions is large, we open a large number of output writers, leading to OOM.
 The solution here is to inject a sorting operation once the number of active 
 partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628620#comment-14628620
 ] 

Ilya Ganelin commented on SPARK-8890:
-

[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of 
{code}InternalRow{code} objects in memory. 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 


 Reduce memory consumption for dynamic partition insert
 --

 Key: SPARK-8890
 URL: https://issues.apache.org/jira/browse/SPARK-8890
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Reynold Xin
Priority: Critical

 Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
 table partitions is large. The problem is that we open one output writer for 
 each partition, and when data are randomized and when the number of 
 partitions is large, we open a large number of output writers, leading to OOM.
 The solution here is to inject a sorting operation once the number of active 
 partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628620#comment-14628620
 ] 

Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 7:55 PM:
--

[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of {InternalRow} 
objects in memory. 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 



was (Author: ilganeli):
[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of 
{code}InternalRow{code} objects in memory. 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 


 Reduce memory consumption for dynamic partition insert
 --

 Key: SPARK-8890
 URL: https://issues.apache.org/jira/browse/SPARK-8890
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Reynold Xin
Priority: Critical

 Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
 table partitions is large. The problem is that we open one output writer for 
 each partition, and when data are randomized and when the number of 
 partitions is large, we open a large number of output writers, leading to OOM.
 The solution here is to inject a sorting operation once the number of active 
 partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628620#comment-14628620
 ] 

Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 7:57 PM:
--

[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of 
{{InternalRow}} objects in memory). 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 



was (Author: ilganeli):
[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of InternalRow 
objects in memory. 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 


 Reduce memory consumption for dynamic partition insert
 --

 Key: SPARK-8890
 URL: https://issues.apache.org/jira/browse/SPARK-8890
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Reynold Xin
Priority: Critical

 Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
 table partitions is large. The problem is that we open one output writer for 
 each partition, and when data are randomized and when the number of 
 partitions is large, we open a large number of output writers, leading to OOM.
 The solution here is to inject a sorting operation once the number of active 
 partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8890) Reduce memory consumption for dynamic partition insert

2015-07-15 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628620#comment-14628620
 ] 

Ilya Ganelin edited comment on SPARK-8890 at 7/15/15 7:57 PM:
--

[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with {{key=1}}, then close that output 
buffer, write the next one etc.

Operational flow would become:
1) Attempt to create  new {{outputWriter}} for each possible {{key}}
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by {{key}} (and persist this sorted set of 
{{InternalRow}} objects in memory). 
4) One key at a time, create an {{outputWriter}} and write all rows associated 
with that key
5) Close outputWriter for that {{key}} and open a new {{outputWriter}}, 
continue from step 4. 



was (Author: ilganeli):
[~yhuai] That makes sense, thank you. Wouldn't we still need to close/delete 
output buffers for keys that have been completely written? Thus, would we, for 
example, write all values associated with key=1, then close that output buffer, 
write the next one etc.

Operational flow would become:
1) Attempt to create  new outputWriter for each possible key
2) When maximum is exceeded, stop outputting rows.
3) Sort all remaining data by key (and persist this sorted set of 
{{InternalRow}} objects in memory). 
4) One key at a time, create an outputWriter and write all rows associated with 
that key
5) Close outputWriter for that key and open a new outputWriter, continue from 
step 4. 


 Reduce memory consumption for dynamic partition insert
 --

 Key: SPARK-8890
 URL: https://issues.apache.org/jira/browse/SPARK-8890
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Reynold Xin
Priority: Critical

 Currently, InsertIntoHadoopFsRelation can run out of memory if the number of 
 table partitions is large. The problem is that we open one output writer for 
 each partition, and when data are randomized and when the number of 
 partitions is large, we open a large number of output writers, leading to OOM.
 The solution here is to inject a sorting operation once the number of active 
 partitions is beyond a certain point (e.g. 50?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8907) Speed up path construction in DynamicPartitionWriterContainer.outputWriterForRow

2015-07-13 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14625150#comment-14625150
 ] 

Ilya Ganelin commented on SPARK-8907:
-

[~rxin] The code for this in master has eliminated usage of zip and map as of 
[SPARK-8961|https://github.com/apache/spark/commit/33630883685eafcc3ee4521ea8363be342f6e6b4].
 Do you think this can be further optimized and if so, how? There doesn't seem 
to be much within the existing catalyst expressions that would facilitate this, 
but I could be wrong. 

The relevant code fragment is below:
{code}
val partitionPath = {
  val partitionPathBuilder = new StringBuilder
  var i = 0

  while (i  partitionColumns.length) {
val col = partitionColumns(i)
val partitionValueString = {
  val string = row.getString(i)
  if (string.eq(null)) defaultPartitionName else 
PartitioningUtils.escapePathName(string)
}

if (i  0) {
  partitionPathBuilder.append(Path.SEPARATOR_CHAR)
}

partitionPathBuilder.append(s$col=$partitionValueString)
i += 1
  }

  partitionPathBuilder.toString()
}
{code}

 Speed up path construction in 
 DynamicPartitionWriterContainer.outputWriterForRow
 

 Key: SPARK-8907
 URL: https://issues.apache.org/jira/browse/SPARK-8907
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Reynold Xin

 Don't use zip and scala collection methods to avoid garbage collection
 {code}
 val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, 
 rawValue) =
   val string = if (rawValue == null) null else String.valueOf(rawValue)
   val valueString = if (string == null || string.isEmpty) {
 defaultPartitionName
   } else {
 PartitioningUtils.escapePathName(string)
   }
   s/$col=$valueString
 }.mkString.stripPrefix(Path.SEPARATOR)
 {code}
 We can probably use catalyst expressions themselves to construct the path, 
 and then we can leverage code generation to do this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8464) Consider separating aggregator and non-aggregator paths in ExternalSorter

2015-06-30 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14608292#comment-14608292
 ] 

Ilya Ganelin commented on SPARK-8464:
-

Josh - I'd be happy to look into this, I'll submit a PR shortly.

 Consider separating aggregator and non-aggregator paths in ExternalSorter
 -

 Key: SPARK-8464
 URL: https://issues.apache.org/jira/browse/SPARK-8464
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Reporter: Josh Rosen

 ExternalSorter is still really complicated and hard to understand.  We should 
 investigate whether separating the aggregator and non-aggregator paths into 
 separate files would make the code easier to understand without introducing 
 significant duplication.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3153) shuffle will run out of space when disks have different free space

2015-06-30 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14608993#comment-14608993
 ] 

Ilya Ganelin commented on SPARK-3153:
-

cc [~davies] Believe this issue can be closed given duplication of SPARK-5418, 
no?

 shuffle will run out of space when disks have different free space
 --

 Key: SPARK-3153
 URL: https://issues.apache.org/jira/browse/SPARK-3153
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Reporter: Davies Liu

 If we have several disks in SPARK_LOCAL_DIRS, and one of them is much smaller 
 than others (maybe added in my mistake, or special disk, SSD), them the 
 shuffle will meet the problem of run out of space with this smaller disk.
 PySpark also has this issue during spilling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7894) Graph Union Operator

2015-06-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14579283#comment-14579283
 ] 

Ilya Ganelin commented on SPARK-7894:
-

How is this functionality different from the existing ```union``` functions 
within ```VertexRDD``` and ```EdgeRDD``` ?

 Graph Union Operator
 

 Key: SPARK-7894
 URL: https://issues.apache.org/jira/browse/SPARK-7894
 Project: Spark
  Issue Type: Sub-task
  Components: GraphX
Reporter: Andy Huang
  Labels: graph, union
 Attachments: union_operator.png


 This operator aims to union two graphs and generate a new graph directly. The 
 union of two graphs is the union of their vertex sets and their edge 
 families.Vertexes and edges which are included in either graph will be part 
 of the new graph.
 bq.  G ∪ H = (VG ∪ VH, EG ∪ EH).
 The below image shows a union of graph G and graph H
 !union_operator.png|width=600px,align=center!
 A Simple interface would be:
 bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED]
 However, inevitably vertexes and edges overlapping will happen between 
 borders of graphs. For vertex, it's quite nature to just make a union and 
 remove those duplicate ones. But for edges, a mergeEdges function seems to be 
 more reasonable.
 bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: 
 (ED, ED) = ED): Graph[VD, ED]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-7894) Graph Union Operator

2015-06-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14579283#comment-14579283
 ] 

Ilya Ganelin edited comment on SPARK-7894 at 6/9/15 5:35 PM:
-

How is this functionality different from the existing {code}union{code} 
functions within {code}VertexRDD{code} and {code}EdgeRDD{code} ?


was (Author: ilganeli):
How is this functionality different from the existing ```union``` functions 
within ```VertexRDD``` and ```EdgeRDD``` ?

 Graph Union Operator
 

 Key: SPARK-7894
 URL: https://issues.apache.org/jira/browse/SPARK-7894
 Project: Spark
  Issue Type: Sub-task
  Components: GraphX
Reporter: Andy Huang
  Labels: graph, union
 Attachments: union_operator.png


 This operator aims to union two graphs and generate a new graph directly. The 
 union of two graphs is the union of their vertex sets and their edge 
 families.Vertexes and edges which are included in either graph will be part 
 of the new graph.
 bq.  G ∪ H = (VG ∪ VH, EG ∪ EH).
 The below image shows a union of graph G and graph H
 !union_operator.png|width=600px,align=center!
 A Simple interface would be:
 bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED]
 However, inevitably vertexes and edges overlapping will happen between 
 borders of graphs. For vertex, it's quite nature to just make a union and 
 remove those duplicate ones. But for edges, a mergeEdges function seems to be 
 more reasonable.
 bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: 
 (ED, ED) = ED): Graph[VD, ED]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-7894) Graph Union Operator

2015-06-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14579283#comment-14579283
 ] 

Ilya Ganelin edited comment on SPARK-7894 at 6/9/15 5:37 PM:
-

How is this functionality different from the existing `union` functions within 
`VertexRDD` and `EdgeRDD` ?


was (Author: ilganeli):
How is this functionality different from the existing {union} functions within 
{VertexRDD} and {EdgeRDD} ?

 Graph Union Operator
 

 Key: SPARK-7894
 URL: https://issues.apache.org/jira/browse/SPARK-7894
 Project: Spark
  Issue Type: Sub-task
  Components: GraphX
Reporter: Andy Huang
  Labels: graph, union
 Attachments: union_operator.png


 This operator aims to union two graphs and generate a new graph directly. The 
 union of two graphs is the union of their vertex sets and their edge 
 families.Vertexes and edges which are included in either graph will be part 
 of the new graph.
 bq.  G ∪ H = (VG ∪ VH, EG ∪ EH).
 The below image shows a union of graph G and graph H
 !union_operator.png|width=600px,align=center!
 A Simple interface would be:
 bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED]
 However, inevitably vertexes and edges overlapping will happen between 
 borders of graphs. For vertex, it's quite nature to just make a union and 
 remove those duplicate ones. But for edges, a mergeEdges function seems to be 
 more reasonable.
 bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: 
 (ED, ED) = ED): Graph[VD, ED]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-7894) Graph Union Operator

2015-06-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14579283#comment-14579283
 ] 

Ilya Ganelin edited comment on SPARK-7894 at 6/9/15 5:38 PM:
-

How is this functionality different from the existing {{union}} functions 
within {{VertexRDD}} and {{EdgeRDD}} ?


was (Author: ilganeli):
How is this functionality different from the existing `union` functions within 
`VertexRDD` and `EdgeRDD` ?

 Graph Union Operator
 

 Key: SPARK-7894
 URL: https://issues.apache.org/jira/browse/SPARK-7894
 Project: Spark
  Issue Type: Sub-task
  Components: GraphX
Reporter: Andy Huang
  Labels: graph, union
 Attachments: union_operator.png


 This operator aims to union two graphs and generate a new graph directly. The 
 union of two graphs is the union of their vertex sets and their edge 
 families.Vertexes and edges which are included in either graph will be part 
 of the new graph.
 bq.  G ∪ H = (VG ∪ VH, EG ∪ EH).
 The below image shows a union of graph G and graph H
 !union_operator.png|width=600px,align=center!
 A Simple interface would be:
 bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED]
 However, inevitably vertexes and edges overlapping will happen between 
 borders of graphs. For vertex, it's quite nature to just make a union and 
 remove those duplicate ones. But for edges, a mergeEdges function seems to be 
 more reasonable.
 bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: 
 (ED, ED) = ED): Graph[VD, ED]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-7894) Graph Union Operator

2015-06-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14579283#comment-14579283
 ] 

Ilya Ganelin edited comment on SPARK-7894 at 6/9/15 5:35 PM:
-

How is this functionality different from the existing {union} functions within 
{VertexRDD} and {EdgeRDD} ?


was (Author: ilganeli):
How is this functionality different from the existing {code}union{code} 
functions within {code}VertexRDD{code} and {code}EdgeRDD{code} ?

 Graph Union Operator
 

 Key: SPARK-7894
 URL: https://issues.apache.org/jira/browse/SPARK-7894
 Project: Spark
  Issue Type: Sub-task
  Components: GraphX
Reporter: Andy Huang
  Labels: graph, union
 Attachments: union_operator.png


 This operator aims to union two graphs and generate a new graph directly. The 
 union of two graphs is the union of their vertex sets and their edge 
 families.Vertexes and edges which are included in either graph will be part 
 of the new graph.
 bq.  G ∪ H = (VG ∪ VH, EG ∪ EH).
 The below image shows a union of graph G and graph H
 !union_operator.png|width=600px,align=center!
 A Simple interface would be:
 bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED]): Graph[VD, ED]
 However, inevitably vertexes and edges overlapping will happen between 
 borders of graphs. For vertex, it's quite nature to just make a union and 
 remove those duplicate ones. But for edges, a mergeEdges function seems to be 
 more reasonable.
 bq. def union[VD: ClassTag, ED: ClassTag](other: Graph[VD, ED], mergeEdges: 
 (ED, ED) = ED): Graph[VD, ED]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7996) Deprecate the developer api SparkEnv.actorSystem

2015-06-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14579643#comment-14579643
 ] 

Ilya Ganelin commented on SPARK-7996:
-

This seems to be mutually exclusive with 
https://issues.apache.org/jira/browse/SPARK-7997. Is the latter a placeholder?

 Deprecate the developer api SparkEnv.actorSystem
 

 Key: SPARK-7996
 URL: https://issues.apache.org/jira/browse/SPARK-7996
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Reporter: Shixiong Zhu





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8056) Design an easier way to construct schema for both Scala and Python

2015-06-05 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14574847#comment-14574847
 ] 

Ilya Ganelin edited comment on SPARK-8056 at 6/5/15 5:18 PM:
-

[~rxin] Sounds good :). Where would you suggest adding a test for StructType 
creation? Not sure where it quite fits in the grand scheme of things. 

With regards to also supporting a string for simple types, I think it's safer 
to enforce usage of DataType since I think the intent is for the SQL schema to 
be strictly typed. Were you suggesting that we allow passing int or long as 
the type argument or for us to infer it automatically by parsing the string? 
That approach seems a little more dangerous.


was (Author: ilganeli):
[~rxin] Sounds good :). Where would you suggest adding a test for StructType 
creation? Not sure where it quite fits in the grand scheme of things. 

With regards to also supporting a string for simple types, I think it's safer 
to enforce usage of DataType since the SQL schema should be strictly typed. 
Were you suggesting that we allow passing int or long as the type argument 
or for us to infer it automatically by parsing the string? That approach seems 
a little more dangerous.

 Design an easier way to construct schema for both Scala and Python
 --

 Key: SPARK-8056
 URL: https://issues.apache.org/jira/browse/SPARK-8056
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Reynold Xin

 StructType is fairly hard to construct, especially in Python.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8056) Design an easier way to construct schema for both Scala and Python

2015-06-05 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14574847#comment-14574847
 ] 

Ilya Ganelin commented on SPARK-8056:
-

[~rxin] Sounds good :). Where would you suggest adding a test for StructType 
creation? Not sure where it quite fits in the grand scheme of things. 

 Design an easier way to construct schema for both Scala and Python
 --

 Key: SPARK-8056
 URL: https://issues.apache.org/jira/browse/SPARK-8056
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Reynold Xin

 StructType is fairly hard to construct, especially in Python.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8056) Design an easier way to construct schema for both Scala and Python

2015-06-05 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14574847#comment-14574847
 ] 

Ilya Ganelin edited comment on SPARK-8056 at 6/5/15 5:17 PM:
-

[~rxin] Sounds good :). Where would you suggest adding a test for StructType 
creation? Not sure where it quite fits in the grand scheme of things. 

With regards to also supporting a string for simple types, I think it's safer 
to enforce usage of DataType since the SQL schema should be strictly typed. 
Were you suggesting that we allow passing int or long as the type argument 
or for us to infer it automatically by parsing the string? That approach seems 
a little more dangerous.


was (Author: ilganeli):
[~rxin] Sounds good :). Where would you suggest adding a test for StructType 
creation? Not sure where it quite fits in the grand scheme of things. 

 Design an easier way to construct schema for both Scala and Python
 --

 Key: SPARK-8056
 URL: https://issues.apache.org/jira/browse/SPARK-8056
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Reynold Xin

 StructType is fairly hard to construct, especially in Python.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8056) Design an easier way to construct schema for both Scala and Python

2015-06-04 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14573874#comment-14573874
 ] 

Ilya Ganelin commented on SPARK-8056:
-

[~rxin] Are you actively working on this? I think this could be readily solved 
by providing interface to construct StructType the way we construct SparkConf, 
e.g.
new StructType().add(f1,v1).add(f1,v2) etc

 Design an easier way to construct schema for both Scala and Python
 --

 Key: SPARK-8056
 URL: https://issues.apache.org/jira/browse/SPARK-8056
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Reynold Xin
Assignee: Reynold Xin

 StructType is fairly hard to construct, especially in Python.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-8056) Design an easier way to construct schema for both Scala and Python

2015-06-04 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14573874#comment-14573874
 ] 

Ilya Ganelin edited comment on SPARK-8056 at 6/5/15 12:35 AM:
--

[~rxin] Are you actively working on this? I think this could be readily solved 
by providing an interface to construct StructType the way we construct 
SparkConf, e.g.
new StructType().add(f1,v1).add(f1,v2) etc


was (Author: ilganeli):
[~rxin] Are you actively working on this? I think this could be readily solved 
by providing interface to construct StructType the way we construct SparkConf, 
e.g.
new StructType().add(f1,v1).add(f1,v2) etc

 Design an easier way to construct schema for both Scala and Python
 --

 Key: SPARK-8056
 URL: https://issues.apache.org/jira/browse/SPARK-8056
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Reporter: Reynold Xin
Assignee: Reynold Xin

 StructType is fairly hard to construct, especially in Python.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-6746) Refactor large functions in DAGScheduler to improve readibility

2015-06-04 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin closed SPARK-6746.
---
Resolution: Won't Fix

 Refactor large functions in DAGScheduler to improve readibility
 ---

 Key: SPARK-6746
 URL: https://issues.apache.org/jira/browse/SPARK-6746
 Project: Spark
  Issue Type: Sub-task
  Components: Scheduler
Reporter: Ilya Ganelin

 The DAGScheduler class contains two huge functions that make it 
 very hard to understand what's going on in the code. These are:
 1) The monolithic handleTaskCompletion 
 2) The cleanupStateForJobAndIndependentStages function
 These can be simply modularized to eliminate some awkward type casting and 
 improve code readability. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7075) Project Tungsten: Improving Physical Execution and Memory Management

2015-04-30 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14521944#comment-14521944
 ] 

Ilya Ganelin commented on SPARK-7075:
-

This looks like the result of a large internal Databricks effort - are there 
pieces of this where you could use external help or is this issue in place 
primarily to document migration of internal code?

 Project Tungsten: Improving Physical Execution and Memory Management
 

 Key: SPARK-7075
 URL: https://issues.apache.org/jira/browse/SPARK-7075
 Project: Spark
  Issue Type: Epic
  Components: Block Manager, Shuffle, Spark Core, SQL
Reporter: Reynold Xin
Assignee: Reynold Xin

 Based on our observation, majority of Spark workloads are not bottlenecked by 
 I/O or network, but rather CPU and memory. This project focuses on 3 areas to 
 improve the efficiency of memory and CPU for Spark applications, to push 
 performance closer to the limits of the underlying hardware.
 1. Memory Management and Binary Processing: leveraging application semantics 
 to manage memory explicitly and eliminate the overhead of JVM object model 
 and garbage collection
 2. Cache-aware computation: algorithms and data structures to exploit memory 
 hierarchy
 3. Code generation: using code generation to exploit modern compilers and CPUs
 Several parts of project Tungsten leverage the DataFrame model, which gives 
 us more semantics about the application. We will also retrofit the 
 improvements onto Spark’s RDD API whenever possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1021) sortByKey() launches a cluster job when it shouldn't

2015-04-24 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14511230#comment-14511230
 ] 

Ilya Ganelin commented on SPARK-1021:
-

I'd be happy to look into this and 
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-4514 .

 sortByKey() launches a cluster job when it shouldn't
 

 Key: SPARK-1021
 URL: https://issues.apache.org/jira/browse/SPARK-1021
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 0.8.0, 0.9.0, 1.0.0, 1.1.0
Reporter: Andrew Ash
Assignee: Erik Erlandson
  Labels: starter

 The sortByKey() method is listed as a transformation, not an action, in the 
 documentation.  But it launches a cluster job regardless.
 http://spark.incubator.apache.org/docs/latest/scala-programming-guide.html
 Some discussion on the mailing list suggested that this is a problem with the 
 rdd.count() call inside Partitioner.scala's rangeBounds method.
 https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L102
 Josh Rosen suggests that rangeBounds should be made into a lazy variable:
 {quote}
 I wonder whether making RangePartitoner .rangeBounds into a lazy val would 
 fix this 
 (https://github.com/apache/incubator-spark/blob/6169fe14a140146602fb07cfcd13eee6efad98f9/core/src/main/scala/org/apache/spark/Partitioner.scala#L95).
   We'd need to make sure that rangeBounds() is never called before an action 
 is performed.  This could be tricky because it's called in the 
 RangePartitioner.equals() method.  Maybe it's sufficient to just compare the 
 number of partitions, the ids of the RDDs used to create the 
 RangePartitioner, and the sort ordering.  This still supports the case where 
 I range-partition one RDD and pass the same partitioner to a different RDD.  
 It breaks support for the case where two range partitioners created on 
 different RDDs happened to have the same rangeBounds(), but it seems unlikely 
 that this would really harm performance since it's probably unlikely that the 
 range partitioners are equal by chance.
 {quote}
 Can we please make this happen?  I'll send a PR on GitHub to start the 
 discussion and testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4514) SparkContext localProperties does not inherit property updates across thread reuse

2015-04-24 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14511537#comment-14511537
 ] 

Ilya Ganelin edited comment on SPARK-4514 at 4/24/15 7:37 PM:
--

[~joshrosen] - given your work on SPARK-6629, is this still relevant? I saw 
that there was a comment there stating that issue may not be a problem. I can 
knock this one out if it's still necessary.


was (Author: ilganeli):
[~joshrosen] - given your work on SPARK-6629 is this still relevant - I saw 
that there was a comment there stating that issue may not be a problem? I can 
knock this one out if it's still necessary.

 SparkContext localProperties does not inherit property updates across thread 
 reuse
 --

 Key: SPARK-4514
 URL: https://issues.apache.org/jira/browse/SPARK-4514
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0, 1.1.1, 1.2.0
Reporter: Erik Erlandson
Assignee: Josh Rosen
Priority: Critical

 The current job group id of a Spark context is stored in the 
 {{localProperties}} member value.   This data structure is designed to be 
 thread local, and its settings are not preserved when {{ComplexFutureAction}} 
 instantiates a new {{Future}}.  
 One consequence of this is that {{takeAsync()}} does not behave in the same 
 way as other async actions, e.g. {{countAsync()}}.  For example, this test 
 (if copied into StatusTrackerSuite.scala), will fail, because 
 {{my-job-group2}} is not propagated to the Future which actually 
 instantiates the job:
 {code:java}
   test(getJobIdsForGroup() with takeAsync()) {
 sc = new SparkContext(local, test, new SparkConf(false))
 sc.setJobGroup(my-job-group2, description)
 sc.statusTracker.getJobIdsForGroup(my-job-group2) should be (Seq.empty)
 val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1)
 val firstJobId = eventually(timeout(10 seconds)) {
   firstJobFuture.jobIds.head
 }
 eventually(timeout(10 seconds)) {
   sc.statusTracker.getJobIdsForGroup(my-job-group2) should be 
 (Seq(firstJobId))
 }
   }
 {code}
 It also impacts current PR for SPARK-1021, which involves additional uses of 
 {{ComplexFutureAction}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4514) SparkContext localProperties does not inherit property updates across thread reuse

2015-04-24 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14511537#comment-14511537
 ] 

Ilya Ganelin commented on SPARK-4514:
-

[~joshrosen] - given your work on SPARK-6629 is this still relevant - I saw 
that there was a comment there stating that issue may not be a problem? I can 
knock this one out if it's still necessary.

 SparkContext localProperties does not inherit property updates across thread 
 reuse
 --

 Key: SPARK-4514
 URL: https://issues.apache.org/jira/browse/SPARK-4514
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0, 1.1.1, 1.2.0
Reporter: Erik Erlandson
Assignee: Josh Rosen
Priority: Critical

 The current job group id of a Spark context is stored in the 
 {{localProperties}} member value.   This data structure is designed to be 
 thread local, and its settings are not preserved when {{ComplexFutureAction}} 
 instantiates a new {{Future}}.  
 One consequence of this is that {{takeAsync()}} does not behave in the same 
 way as other async actions, e.g. {{countAsync()}}.  For example, this test 
 (if copied into StatusTrackerSuite.scala), will fail, because 
 {{my-job-group2}} is not propagated to the Future which actually 
 instantiates the job:
 {code:java}
   test(getJobIdsForGroup() with takeAsync()) {
 sc = new SparkContext(local, test, new SparkConf(false))
 sc.setJobGroup(my-job-group2, description)
 sc.statusTracker.getJobIdsForGroup(my-job-group2) should be (Seq.empty)
 val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1)
 val firstJobId = eventually(timeout(10 seconds)) {
   firstJobFuture.jobIds.head
 }
 eventually(timeout(10 seconds)) {
   sc.statusTracker.getJobIdsForGroup(my-job-group2) should be 
 (Seq(firstJobId))
 }
   }
 {code}
 It also impacts current PR for SPARK-1021, which involves additional uses of 
 {{ComplexFutureAction}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5945) Spark should not retry a stage infinitely on a FetchFailedException

2015-04-23 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14510154#comment-14510154
 ] 

Ilya Ganelin commented on SPARK-5945:
-

So to recap:
a) Move failure count tracking into Stage
b) Reset failure count on Stage success, so even if that stage is re-submitted 
due to failures downstream, we never hit the cap
c) Remove config parameter. 

 Spark should not retry a stage infinitely on a FetchFailedException
 ---

 Key: SPARK-5945
 URL: https://issues.apache.org/jira/browse/SPARK-5945
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: Imran Rashid
Assignee: Ilya Ganelin

 While investigating SPARK-5928, I noticed some very strange behavior in the 
 way spark retries stages after a FetchFailedException.  It seems that on a 
 FetchFailedException, instead of simply killing the task and retrying, Spark 
 aborts the stage and retries.  If it just retried the task, the task might 
 fail 4 times and then trigger the usual job killing mechanism.  But by 
 killing the stage instead, the max retry logic is skipped (it looks to me 
 like there is no limit for retries on a stage).
 After a bit of discussion with Kay Ousterhout, it seems the idea is that if a 
 fetch fails, we assume that the block manager we are fetching from has 
 failed, and that it will succeed if we retry the stage w/out that block 
 manager.  In that case, it wouldn't make any sense to retry the task, since 
 its doomed to fail every time, so we might as well kill the whole stage.  But 
 this raises two questions:
 1) Is it really safe to assume that a FetchFailedException means that the 
 BlockManager has failed, and ti will work if we just try another one?  
 SPARK-5928 shows that there are at least some cases where that assumption is 
 wrong.  Even if we fix that case, this logic seems brittle to the next case 
 we find.  I guess the idea is that this behavior is what gives us the R in 
 RDD ... but it seems like its not really that robust and maybe should be 
 reconsidered.
 2) Should stages only be retried a limited number of times?  It would be 
 pretty easy to put in a limited number of retries per stage.  Though again, 
 we encounter issues with keeping things resilient.  Theoretically one stage 
 could have many retries, but due to failures in different stages further 
 downstream, so we might need to track the cause of each retry as well to 
 still have the desired behavior.
 In general it just seems there is some flakiness in the retry logic.  This is 
 the only reproducible example I have at the moment, but I vaguely recall 
 hitting other cases of strange behavior w/ retries when trying to run long 
 pipelines.  Eg., if one executor is stuck in a GC during a fetch, the fetch 
 fails, but the executor eventually comes back and the stage gets retried 
 again, but the same GC issues happen the second time around, etc.
 Copied from SPARK-5928, here's the example program that can regularly produce 
 a loop of stage failures.  Note that it will only fail from a remote fetch, 
 so it can't be run locally -- I ran with {{MASTER=yarn-client spark-shell 
 --num-executors 2 --executor-memory 4000m}}
 {code}
 val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =
   val n = 3e3.toInt
   val arr = new Array[Byte](n)
   //need to make sure the array doesn't compress to something small
   scala.util.Random.nextBytes(arr)
   arr
 }
 rdd.map { x = (1, x)}.groupByKey().count()
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5945) Spark should not retry a stage infinitely on a FetchFailedException

2015-04-22 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14508019#comment-14508019
 ] 

Ilya Ganelin commented on SPARK-5945:
-

[~kayousterhout] - thanks for the review. If I understand correctly, your 
suggestion would still address [~imranr]'s second comment since the first stage 
would always (or mostly succeed), e.g. it wouldn't have N consecutive failures 
so even if subsequent stages fail, those wouldn't count towards the failure 
count for this particular stage since it would have been reset when it 
succeeded. 

Do you have any thoughts on the first comment? Specifically, is retrying a 
stage likely to succeed at all or is it a waste of effort in the first place?


 Spark should not retry a stage infinitely on a FetchFailedException
 ---

 Key: SPARK-5945
 URL: https://issues.apache.org/jira/browse/SPARK-5945
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: Imran Rashid
Assignee: Ilya Ganelin

 While investigating SPARK-5928, I noticed some very strange behavior in the 
 way spark retries stages after a FetchFailedException.  It seems that on a 
 FetchFailedException, instead of simply killing the task and retrying, Spark 
 aborts the stage and retries.  If it just retried the task, the task might 
 fail 4 times and then trigger the usual job killing mechanism.  But by 
 killing the stage instead, the max retry logic is skipped (it looks to me 
 like there is no limit for retries on a stage).
 After a bit of discussion with Kay Ousterhout, it seems the idea is that if a 
 fetch fails, we assume that the block manager we are fetching from has 
 failed, and that it will succeed if we retry the stage w/out that block 
 manager.  In that case, it wouldn't make any sense to retry the task, since 
 its doomed to fail every time, so we might as well kill the whole stage.  But 
 this raises two questions:
 1) Is it really safe to assume that a FetchFailedException means that the 
 BlockManager has failed, and ti will work if we just try another one?  
 SPARK-5928 shows that there are at least some cases where that assumption is 
 wrong.  Even if we fix that case, this logic seems brittle to the next case 
 we find.  I guess the idea is that this behavior is what gives us the R in 
 RDD ... but it seems like its not really that robust and maybe should be 
 reconsidered.
 2) Should stages only be retried a limited number of times?  It would be 
 pretty easy to put in a limited number of retries per stage.  Though again, 
 we encounter issues with keeping things resilient.  Theoretically one stage 
 could have many retries, but due to failures in different stages further 
 downstream, so we might need to track the cause of each retry as well to 
 still have the desired behavior.
 In general it just seems there is some flakiness in the retry logic.  This is 
 the only reproducible example I have at the moment, but I vaguely recall 
 hitting other cases of strange behavior w/ retries when trying to run long 
 pipelines.  Eg., if one executor is stuck in a GC during a fetch, the fetch 
 fails, but the executor eventually comes back and the stage gets retried 
 again, but the same GC issues happen the second time around, etc.
 Copied from SPARK-5928, here's the example program that can regularly produce 
 a loop of stage failures.  Note that it will only fail from a remote fetch, 
 so it can't be run locally -- I ran with {{MASTER=yarn-client spark-shell 
 --num-executors 2 --executor-memory 4000m}}
 {code}
 val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =
   val n = 3e3.toInt
   val arr = new Array[Byte](n)
   //need to make sure the array doesn't compress to something small
   scala.util.Random.nextBytes(arr)
   arr
 }
 rdd.map { x = (1, x)}.groupByKey().count()
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-6891) ExecutorAllocationManager will request negative number executors

2015-04-22 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14507886#comment-14507886
 ] 

Ilya Ganelin edited comment on SPARK-6891 at 4/22/15 8:59 PM:
--

[~meiyoula]
I'm running Spark 1.3 (from the released builds) and I tried your code in the 
spark shell on yarn as spark-shell --master yarn. I'm able to run it without 
issue, I can successfully run multiple calls to runSparkPi. 

Were you seeing this issue when running the trunk?  


was (Author: ilganeli):
[~meiyoula]
I'm running Spark 1.3 (from the released builds) and I tried your code in the 
spark shell on yarn as spark-shell --master. I'm able to run it without issue, 
I can successfully run multiple calls to runSparkPi. 

Were you seeing this issue when running the trunk?  

 ExecutorAllocationManager will request negative number executors
 

 Key: SPARK-6891
 URL: https://issues.apache.org/jira/browse/SPARK-6891
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: meiyoula
Priority: Critical
 Attachments: DynamicExecutorTest.scala


 Below is the exception:
15/04/14 10:10:18 ERROR Utils: Uncaught exception in thread 
 spark-dynamic-executor-allocation-0
 java.lang.IllegalArgumentException: Attempted to request a negative 
 number of executor(s) -1 from the cluster manager. Please specify a positive 
 number!
  at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:342)
  at 
 org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1170)
  at 
 org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294)
  at 
 org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263)
  at 
 org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230)
  at 
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189)
  at 
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
  at 
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
  at 
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1723)
  at 
 org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189)
  at 
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
  at 
 java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
  at 
 java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
  at 
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
  at 
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)
 Below is the configurations I  setted:
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors   0
spark.dynamicAllocation.initialExecutors3
spark.dynamicAllocation.maxExecutors7
spark.dynamicAllocation.executorIdleTimeout 30
spark.shuffle.service.enabled   true



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6891) ExecutorAllocationManager will request negative number executors

2015-04-22 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14507886#comment-14507886
 ] 

Ilya Ganelin commented on SPARK-6891:
-

[~meiyoula]
I'm running Spark 1.3 (from the released builds) and I tried your code in the 
spark shell on yarn as spark-shell --master. I'm able to run it without issue, 
I can successfully run multiple calls to runSparkPi. 

Were you seeing this issue when running the trunk?  

 ExecutorAllocationManager will request negative number executors
 

 Key: SPARK-6891
 URL: https://issues.apache.org/jira/browse/SPARK-6891
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: meiyoula
Priority: Critical
 Attachments: DynamicExecutorTest.scala


 Below is the exception:
15/04/14 10:10:18 ERROR Utils: Uncaught exception in thread 
 spark-dynamic-executor-allocation-0
 java.lang.IllegalArgumentException: Attempted to request a negative 
 number of executor(s) -1 from the cluster manager. Please specify a positive 
 number!
  at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:342)
  at 
 org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1170)
  at 
 org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294)
  at 
 org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263)
  at 
 org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230)
  at 
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189)
  at 
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
  at 
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
  at 
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1723)
  at 
 org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189)
  at 
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
  at 
 java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
  at 
 java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
  at 
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
  at 
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)
 Below is the configurations I  setted:
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors   0
spark.dynamicAllocation.initialExecutors3
spark.dynamicAllocation.maxExecutors7
spark.dynamicAllocation.executorIdleTimeout 30
spark.shuffle.service.enabled   true



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6891) ExecutorAllocationManager will request negative number executors

2015-04-19 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14502322#comment-14502322
 ] 

Ilya Ganelin commented on SPARK-6891:
-

[~meiyoula] Any hints on reproducing this aside from your configuration? E.g. 
simple test code to execute?

 ExecutorAllocationManager will request negative number executors
 

 Key: SPARK-6891
 URL: https://issues.apache.org/jira/browse/SPARK-6891
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: meiyoula
Priority: Critical

 Below is the exception:
15/04/14 10:10:18 ERROR Utils: Uncaught exception in thread 
 spark-dynamic-executor-allocation-0
 java.lang.IllegalArgumentException: Attempted to request a negative 
 number of executor(s) -1 from the cluster manager. Please specify a positive 
 number!
  at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:342)
  at 
 org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1170)
  at 
 org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294)
  at 
 org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263)
  at 
 org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230)
  at 
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189)
  at 
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
  at 
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
  at 
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1723)
  at 
 org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189)
  at 
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
  at 
 java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
  at 
 java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
  at 
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
  at 
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)
 Below is the configurations I  setted:
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors   0
spark.dynamicAllocation.initialExecutors3
spark.dynamicAllocation.maxExecutors7
spark.dynamicAllocation.executorIdleTimeout 30
spark.shuffle.service.enabled   true



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6932) A Prototype of Parameter Server

2015-04-17 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6932:

Labels:   (was: kjhghbg)

 A Prototype of Parameter Server
 ---

 Key: SPARK-6932
 URL: https://issues.apache.org/jira/browse/SPARK-6932
 Project: Spark
  Issue Type: New Feature
  Components: ML, MLlib, Spark Core
Reporter: Qiping Li

  h2. Introduction
 As specified in 
 [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590],it would be 
 very helpful to integrate parameter server into Spark for machine learning 
 algorithms, especially for those with ultra high dimensions features. 
 After carefully studying the design doc of [Parameter 
 Servers|https://docs.google.com/document/d/1SX3nkmF41wFXAAIr9BgqvrHSS5mW362fJ7roBXJm06o/edit?usp=sharing],and
  the paper of [Factorbird|http://stanford.edu/~rezab/papers/factorbird.pdf], 
 we proposed a prototype of Parameter Server on Spark(Ps-on-Spark), with 
 several key design concerns:
 * *User friendly interface*
   Careful investigation is done to most existing Parameter Server 
 systems(including:  [petuum|http://petuum.github.io], [parameter 
 server|http://parameterserver.org], 
 [paracel|https://github.com/douban/paracel]) and a user friendly interface is 
 design by absorbing essence from all these system. 
 * *Prototype of distributed array*
 IndexRDD (see 
 [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590]) doesn't seem 
 to be a good option for distributed array, because in most case, the #key 
 updates/second is not be very high. 
 So we implement a distributed HashMap to store the parameters, which can 
 be easily extended to get better performance.
 
 * *Minimal code change*
   Quite a lot of effort in done to avoid code change of Spark core. Tasks 
 which need parameter server are still created and scheduled by Spark's 
 scheduler. Tasks communicate with parameter server with a client object, 
 through *akka* or *netty*.
 With all these concerns we propose the following architecture:
 h2. Architecture
 !https://cloud.githubusercontent.com/assets/1285855/7158179/f2d25cc4-e3a9-11e4-835e-89681596c478.jpg!
 Data is stored in RDD and is partitioned across workers. During each 
 iteration, each worker gets parameters from parameter server then computes 
 new parameters based on old parameters and data in the partition. Finally 
 each worker updates parameters to parameter server.Worker communicates with 
 parameter server through a parameter server client,which is initialized in 
 `TaskContext` of this worker.
 The current implementation is based on YARN cluster mode, 
 but it should not be a problem to transplanted it to other modes. 
 h3. Interface
 We refer to existing parameter server systems(petuum, parameter server, 
 paracel) when design the interface of parameter server. 
 *`PSClient` provides the following interface for workers to use:*
 {code}
 //  get parameter indexed by key from parameter server
 def get[T](key: String): T
 // get multiple parameters from parameter server
 def multiGet[T](keys: Array[String]): Array[T]
 // add parameter indexed by `key` by `delta`, 
 // if multiple `delta` to update on the same parameter,
 // use `reduceFunc` to reduce these `delta`s frist.
 def update[T](key: String, delta: T, reduceFunc: (T, T) = T): Unit
 // update multiple parameters at the same time, use the same `reduceFunc`.
 def multiUpdate(keys: Array[String], delta: Array[T], reduceFunc: (T, T) = 
 T: Unit
 
 // advance clock to indicate that current iteration is finished.
 def clock(): Unit
  
 // block until all workers have reached this line of code.
 def sync(): Unit
 {code}
 *`PSContext` provides following functions to use on driver:*
 {code}
 // load parameters from existing rdd.
 def loadPSModel[T](model: RDD[String, T]) 
 // fetch parameters from parameter server to construct model.
 def fetchPSModel[T](keys: Array[String]): Array[T]
 {code} 
 
 *A new function has been add to `RDD` to run parameter server tasks:*
 {code}
 // run the provided `func` on each partition of this RDD. 
 // This function can use data of this partition(the first argument) 
 // and a parameter server client(the second argument). 
 // See the following Logistic Regression for an example.
 def runWithPS[U: ClassTag](func: (Array[T], PSClient) = U): Array[U]

 {code}
 h2. Example
 Here is an example of using our prototype to implement logistic regression:
 {code:title=LogisticRegression.scala|borderStyle=solid}
 def train(
 sc: SparkContext,
 input: RDD[LabeledPoint],
 numIterations: Int,
 stepSize: Double,
 miniBatchFraction: Double): LogisticRegressionModel = {
 
 // initialize weights
 val numFeatures = input.map(_.features.size).first()
 val initialWeights = new 

[jira] [Updated] (SPARK-6932) A Prototype of Parameter Server

2015-04-17 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6932:

Description: 
 h2. Introduction

As specified in 
[SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590],it would be very 
helpful to integrate parameter server into Spark for machine learning 
algorithms, especially for those with ultra high dimensions features. 

After carefully studying the design doc of [Parameter 
Servers|https://docs.google.com/document/d/1SX3nkmF41wFXAAIr9BgqvrHSS5mW362fJ7roBXJm06o/edit?usp=sharing],and
 the paper of [Factorbird|http://stanford.edu/~rezab/papers/factorbird.pdf], we 
proposed a prototype of Parameter Server on Spark(Ps-on-Spark), with several 
key design concerns:

* *User friendly interface*
Careful investigation is done to most existing Parameter Server 
systems(including:  [petuum|http://petuum.github.io], [parameter 
server|http://parameterserver.org], 
[paracel|https://github.com/douban/paracel]) and a user friendly interface is 
design by absorbing essence from all these system. 

* *Prototype of distributed array*
IndexRDD (see 
[SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590]) doesn't seem to 
be a good option for distributed array, because in most case, the #key 
updates/second is not be very high. 
So we implement a distributed HashMap to store the parameters, which can be 
easily extended to get better performance.

* *Minimal code change*
Quite a lot of effort in done to avoid code change of Spark core. Tasks 
which need parameter server are still created and scheduled by Spark's 
scheduler. Tasks communicate with parameter server with a client object, 
through *akka* or *netty*.

With all these concerns we propose the following architecture:

h2. Architecture

!https://cloud.githubusercontent.com/assets/1285855/7158179/f2d25cc4-e3a9-11e4-835e-89681596c478.jpg!

Data is stored in RDD and is partitioned across workers. During each iteration, 
each worker gets parameters from parameter server then computes new parameters 
based on old parameters and data in the partition. Finally each worker updates 
parameters to parameter server.Worker communicates with parameter server 
through a parameter server client,which is initialized in `TaskContext` of this 
worker.

The current implementation is based on YARN cluster mode, 
but it should not be a problem to transplanted it to other modes. 

h3. Interface

We refer to existing parameter server systems(petuum, parameter server, 
paracel) when design the interface of parameter server. 

*`PSClient` provides the following interface for workers to use:*

{code}

//  get parameter indexed by key from parameter server
def get[T](key: String): T

// get multiple parameters from parameter server
def multiGet[T](keys: Array[String]): Array[T]

// add parameter indexed by `key` by `delta`, 
// if multiple `delta` to update on the same parameter,
// use `reduceFunc` to reduce these `delta`s frist.
def update[T](key: String, delta: T, reduceFunc: (T, T) = T): Unit

// update multiple parameters at the same time, use the same `reduceFunc`.
def multiUpdate(keys: Array[String], delta: Array[T], reduceFunc: (T, T) = T: 
Unit

// advance clock to indicate that current iteration is finished.
def clock(): Unit
 
// block until all workers have reached this line of code.
def sync(): Unit
{code}

*`PSContext` provides following functions to use on driver:*

{code}

// load parameters from existing rdd.
def loadPSModel[T](model: RDD[String, T]) 

// fetch parameters from parameter server to construct model.
def fetchPSModel[T](keys: Array[String]): Array[T]

{code} 

*A new function has been add to `RDD` to run parameter server tasks:*

{code}

// run the provided `func` on each partition of this RDD. 
// This function can use data of this partition(the first argument) 
// and a parameter server client(the second argument). 
// See the following Logistic Regression for an example.
def runWithPS[U: ClassTag](func: (Array[T], PSClient) = U): Array[U]
   
{code}

h2. Example

Here is an example of using our prototype to implement logistic regression:

{code:title=LogisticRegression.scala|borderStyle=solid}

def train(
sc: SparkContext,
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double): LogisticRegressionModel = {

// initialize weights
val numFeatures = input.map(_.features.size).first()
val initialWeights = new Array[Double](numFeatures)

// initialize parameter server context
val pssc = new PSContext(sc)

// load initialized weights into parameter server
val initialModelRDD = sc.parallelize(Array((w, initialWeights)), 1)
pssc.loadPSModel(initialModelRDD)

// run logistic regression algorithm on input data   
input.runWithPS((arr, client) = {
  val sampler = new 

[jira] [Commented] (SPARK-6703) Provide a way to discover existing SparkContext's

2015-04-13 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14492909#comment-14492909
 ] 

Ilya Ganelin commented on SPARK-6703:
-

Patrick - what¹s the time line for the 1.4 release? Just want to have a
sense for it so I can schedule accordingly.

Thank you, 
Ilya Ganelin










The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.



 Provide a way to discover existing SparkContext's
 -

 Key: SPARK-6703
 URL: https://issues.apache.org/jira/browse/SPARK-6703
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: Patrick Wendell
Assignee: Ilya Ganelin
Priority: Critical

 Right now it is difficult to write a Spark application in a way that can be 
 run independently and also be composed with other Spark applications in an 
 environment such as the JobServer, notebook servers, etc where there is a 
 shared SparkContext.
 It would be nice to provide a rendez-vous point so that applications can 
 learn whether an existing SparkContext already exists before creating one.
 The most simple/surgical way I see to do this is to have an optional static 
 SparkContext singleton that people can be retrieved as follows:
 {code}
 val sc = SparkContext.getOrCreate(conf = new SparkConf())
 {code}
 And you could also have a setter where some outer framework/server can set it 
 for use by multiple downstream applications.
 A more advanced version of this would have some named registry or something, 
 but since we only support a single SparkContext in one JVM at this point 
 anyways, this seems sufficient and much simpler. Another advanced option 
 would be to allow plugging in some other notion of configuration you'd pass 
 when retrieving an existing context.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6703) Provide a way to discover existing SparkContext's

2015-04-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14491877#comment-14491877
 ] 

Ilya Ganelin commented on SPARK-6703:
-

Patrick - I can look into this. Thank you.

 Provide a way to discover existing SparkContext's
 -

 Key: SPARK-6703
 URL: https://issues.apache.org/jira/browse/SPARK-6703
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: Patrick Wendell

 Right now it is difficult to write a Spark application in a way that can be 
 run independently and also be composed with other Spark applications in an 
 environment such as the JobServer, notebook servers, etc where there is a 
 shared SparkContext.
 It would be nice to provide a rendez-vous point so that applications can 
 learn whether an existing SparkContext already exists before creating one.
 The most simple/surgical way I see to do this is to have an optional static 
 SparkContext singleton that people can be retrieved as follows:
 {code}
 val sc = SparkContext.getOrCreate(conf = new SparkConf())
 {code}
 And you could also have a setter where some outer framework/server can set it 
 for use by multiple downstream applications.
 A more advanced version of this would have some named registry or something, 
 but since we only support a single SparkContext in one JVM at this point 
 anyways, this seems sufficient and much simpler. Another advanced option 
 would be to allow plugging in some other notion of configuration you'd pass 
 when retrieving an existing context.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6839) BlockManager.dataDeserialize leaks resources on user exceptions

2015-04-10 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14490545#comment-14490545
 ] 

Ilya Ganelin commented on SPARK-6839:
-

Imran - I can knock this out. Thanks!

 BlockManager.dataDeserialize leaks resources on user exceptions
 ---

 Key: SPARK-6839
 URL: https://issues.apache.org/jira/browse/SPARK-6839
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: Imran Rashid

 From a discussion with [~vanzin] on {{ByteBufferInputStream}}, we realized 
 that 
 [{{BlockManager.dataDeserialize}}|https://github.com/apache/spark/blob/b5c51c8df480f1a82a82e4d597d8eea631bffb4e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1202]
  doesn't  guarantee the underlying InputStream is properly closed.  In 
 particular, {{BlockManager.dispose(byteBuffer)}} will not get called any time 
 there is an exception in user code.
 The problem is that right now, we convert the input streams to iterators, and 
 only close the input stream if the end of the iterator is reached.  But, we 
 might never reach the end of the iterator -- the obvious case is if there is 
 a bug in the user code, so tasks fail part of the way through the iterator.
 I think the solution is to give {{BlockManager.dataDeserialize}} a 
 {{TaskContext}} so it can call {{context.addTaskCompletionListener}} to do 
 the cleanup (as is done in {{ShuffleBlockFetcherIterator}}).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6839) BlockManager.dataDeserialize leaks resources on user exceptions

2015-04-10 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14490576#comment-14490576
 ] 

Ilya Ganelin commented on SPARK-6839:
-

The obvious solution won't work. 

Adding a ```TaskContext``` to ```dataSerialize()``` won't work because it's 
called from within both ```MemoryStore``` and ```TachyonStore``` which are 
instantiated within the ```BlockManager``` constructor. The ```TaskContext``` 
also can't be created within the constructor for ```BlockManager``` since 
that's created within the ```SparkEnv``` constructor which has no tasks 
associated with it.

The only workable solution that I can see is to assign a ```TaskContext``` to 
the ```BlockManager``` at run-time but that sounds very sketchy to me since the 
block manager is a singleton and we may have multiple tasks going at once. Any 
thoughts on this conundrum?

 BlockManager.dataDeserialize leaks resources on user exceptions
 ---

 Key: SPARK-6839
 URL: https://issues.apache.org/jira/browse/SPARK-6839
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: Imran Rashid

 From a discussion with [~vanzin] on {{ByteBufferInputStream}}, we realized 
 that 
 [{{BlockManager.dataDeserialize}}|https://github.com/apache/spark/blob/b5c51c8df480f1a82a82e4d597d8eea631bffb4e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1202]
  doesn't  guarantee the underlying InputStream is properly closed.  In 
 particular, {{BlockManager.dispose(byteBuffer)}} will not get called any time 
 there is an exception in user code.
 The problem is that right now, we convert the input streams to iterators, and 
 only close the input stream if the end of the iterator is reached.  But, we 
 might never reach the end of the iterator -- the obvious case is if there is 
 a bug in the user code, so tasks fail part of the way through the iterator.
 I think the solution is to give {{BlockManager.dataDeserialize}} a 
 {{TaskContext}} so it can call {{context.addTaskCompletionListener}} to do 
 the cleanup (as is done in {{ShuffleBlockFetcherIterator}}).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-6839) BlockManager.dataDeserialize leaks resources on user exceptions

2015-04-10 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14490576#comment-14490576
 ] 

Ilya Ganelin edited comment on SPARK-6839 at 4/11/15 12:07 AM:
---

The obvious solution won't work. 

Adding a {code}TaskContext{code} to {code}dataSerialize(){code} won't work 
because it's called from within both {code}MemoryStore{code} and 
{code}TachyonStore{code} which are instantiated within the 
{code}BlockManager{code} constructor. The {code}TaskContext{code} also can't be 
created within the constructor for {code}BlockManager{code} since that's 
created within the {code}SparkEnv{code} constructor which has no tasks 
associated with it.

The only workable solution that I can see is to assign a 
{code}TaskContext{code} to the {code}BlockManager{code} at run-time but that 
sounds very sketchy to me since the block manager is a singleton and we may 
have multiple tasks going at once. Any thoughts on this conundrum?


was (Author: ilganeli):
The obvious solution won't work. 

Adding a ```TaskContext``` to ```dataSerialize()``` won't work because it's 
called from within both ```MemoryStore``` and ```TachyonStore``` which are 
instantiated within the ```BlockManager``` constructor. The ```TaskContext``` 
also can't be created within the constructor for ```BlockManager``` since 
that's created within the ```SparkEnv``` constructor which has no tasks 
associated with it.

The only workable solution that I can see is to assign a ```TaskContext``` to 
the ```BlockManager``` at run-time but that sounds very sketchy to me since the 
block manager is a singleton and we may have multiple tasks going at once. Any 
thoughts on this conundrum?

 BlockManager.dataDeserialize leaks resources on user exceptions
 ---

 Key: SPARK-6839
 URL: https://issues.apache.org/jira/browse/SPARK-6839
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: Imran Rashid

 From a discussion with [~vanzin] on {{ByteBufferInputStream}}, we realized 
 that 
 [{{BlockManager.dataDeserialize}}|https://github.com/apache/spark/blob/b5c51c8df480f1a82a82e4d597d8eea631bffb4e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1202]
  doesn't  guarantee the underlying InputStream is properly closed.  In 
 particular, {{BlockManager.dispose(byteBuffer)}} will not get called any time 
 there is an exception in user code.
 The problem is that right now, we convert the input streams to iterators, and 
 only close the input stream if the end of the iterator is reached.  But, we 
 might never reach the end of the iterator -- the obvious case is if there is 
 a bug in the user code, so tasks fail part of the way through the iterator.
 I think the solution is to give {{BlockManager.dataDeserialize}} a 
 {{TaskContext}} so it can call {{context.addTaskCompletionListener}} to do 
 the cleanup (as is done in {{ShuffleBlockFetcherIterator}}).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-6839) BlockManager.dataDeserialize leaks resources on user exceptions

2015-04-10 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14490576#comment-14490576
 ] 

Ilya Ganelin edited comment on SPARK-6839 at 4/11/15 12:09 AM:
---

The obvious solution won't work. 

Adding a {{TaskContext}} to {{dataSerialize()}} won't work because it's called 
from within both {{MemoryStore}} and {{TachyonStore}} which are instantiated 
within the {{BlockManager}} constructor. The {{TaskContext}} also can't be 
created within the constructor for {{BlockManager}} since that's created within 
the {{SparkEnv}} constructor which has no tasks associated with it.

The only workable solution that I can see is to assign a {{TaskContext}} to the 
{{BlockManager}} at run-time but that sounds very sketchy to me since the block 
manager is a singleton and we may have multiple tasks going at once. Any 
thoughts on this conundrum?


was (Author: ilganeli):
The obvious solution won't work. 

Adding a {code}TaskContext{code} to {code}dataSerialize(){code} won't work 
because it's called from within both {code}MemoryStore{code} and 
{code}TachyonStore{code} which are instantiated within the 
{code}BlockManager{code} constructor. The {code}TaskContext{code} also can't be 
created within the constructor for {code}BlockManager{code} since that's 
created within the {code}SparkEnv{code} constructor which has no tasks 
associated with it.

The only workable solution that I can see is to assign a 
{code}TaskContext{code} to the {code}BlockManager{code} at run-time but that 
sounds very sketchy to me since the block manager is a singleton and we may 
have multiple tasks going at once. Any thoughts on this conundrum?

 BlockManager.dataDeserialize leaks resources on user exceptions
 ---

 Key: SPARK-6839
 URL: https://issues.apache.org/jira/browse/SPARK-6839
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: Imran Rashid

 From a discussion with [~vanzin] on {{ByteBufferInputStream}}, we realized 
 that 
 [{{BlockManager.dataDeserialize}}|https://github.com/apache/spark/blob/b5c51c8df480f1a82a82e4d597d8eea631bffb4e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1202]
  doesn't  guarantee the underlying InputStream is properly closed.  In 
 particular, {{BlockManager.dispose(byteBuffer)}} will not get called any time 
 there is an exception in user code.
 The problem is that right now, we convert the input streams to iterators, and 
 only close the input stream if the end of the iterator is reached.  But, we 
 might never reach the end of the iterator -- the obvious case is if there is 
 a bug in the user code, so tasks fail part of the way through the iterator.
 I think the solution is to give {{BlockManager.dataDeserialize}} a 
 {{TaskContext}} so it can call {{context.addTaskCompletionListener}} to do 
 the cleanup (as is done in {{ShuffleBlockFetcherIterator}}).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6780) Add saveAsTextFileByKey method for PySpark

2015-04-08 Thread Ilya Ganelin (JIRA)
Ilya Ganelin created SPARK-6780:
---

 Summary: Add saveAsTextFileByKey method for PySpark
 Key: SPARK-6780
 URL: https://issues.apache.org/jira/browse/SPARK-6780
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Reporter: Ilya Ganelin


The PySpark API should have a method to allow saving a key-value RDD to 
subdirectories organized by key as in :

https://issues.apache.org/jira/browse/SPARK-3533



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6780) Add saveAsTextFileByKey method for PySpark

2015-04-08 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14485577#comment-14485577
 ] 

Ilya Ganelin commented on SPARK-6780:
-

SPARK-3533 defines matching methods for Scala and Java APIs.

 Add saveAsTextFileByKey method for PySpark
 --

 Key: SPARK-6780
 URL: https://issues.apache.org/jira/browse/SPARK-6780
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Reporter: Ilya Ganelin

 The PySpark API should have a method to allow saving a key-value RDD to 
 subdirectories organized by key as in :
 https://issues.apache.org/jira/browse/SPARK-3533



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6780) Add saveAsTextFileByKey method for PySpark

2015-04-08 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14485582#comment-14485582
 ] 

Ilya Ganelin commented on SPARK-6780:
-

This code was my attempt to implement this within PythonRDD.scala but I ran 
into run-time reflection issues I could not solve.

{code}
  /**
   * Output a Python RDD of key-value pairs to any Hadoop file system such that 
the values within
   * the rdd are written to sub-directories organized by the associated key.
   *
   * Keys and values are converted to suitable output types using either user 
specified converters
   * or, if not specified, 
[[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion
   * types `keyClass` and `valueClass` are automatically inferred if not 
specified. The passed-in
   * `confAsMap` is merged with the default Hadoop conf associated with the 
SparkContext of
   * this RDD.
   */
  def saveAsHadoopFileByKey[K, V, C : CompressionCodec](
  pyRDD: JavaRDD[Array[Byte]],
  batchSerialized: Boolean,
  path: String,
  outputFormatClass: String,
  keyClass: String,
  valueClass: String,
  keyConverterClass: String,
  valueConverterClass: String,
  confAsMap: java.util.HashMap[String, String],
  compressionCodecClass: String) = {
val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
  inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val codec = 
Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
  new JavaToWritableConverter)

converted.saveAsHadoopFile(path,
  ClassUtils.primitiveToWrapper(kc),
  ClassUtils.primitiveToWrapper(vc),
  classOf[RDDMultipleTextOutputFormat[K,V]],
  new JobConf(mergedConf),
  codec=codec)
  }

{code}

 Add saveAsTextFileByKey method for PySpark
 --

 Key: SPARK-6780
 URL: https://issues.apache.org/jira/browse/SPARK-6780
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Reporter: Ilya Ganelin

 The PySpark API should have a method to allow saving a key-value RDD to 
 subdirectories organized by key as in :
 https://issues.apache.org/jira/browse/SPARK-3533



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6780) Add saveAsTextFileByKey method for PySpark

2015-04-08 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14485586#comment-14485586
 ] 

Ilya Ganelin commented on SPARK-6780:
-

Matching test code:

{code}
 test(saveAsHadoopFileByKey should generate a text file per key) {
val testPairs : JavaRDD[Array[Byte]] = sc.parallelize(
  Seq(
Array(1.toByte,1.toByte),
Array(2.toByte,4.toByte),
Array(3.toByte,9.toByte),
Array(4.toByte,16.toByte),
Array(5.toByte,25.toByte))
).toJavaRDD()

val fs = FileSystem.get(new Configuration())
val basePath = sc.conf.get(spark.local.dir, /tmp)
val fullPath = basePath + /testPath
fs.delete(new Path(fullPath), true)

PythonRDD.saveAsHadoopFileByKey(
  testPairs,
  false,
  fullPath,
  classOf[RDDMultipleTextOutputFormat].toString,
  classOf[Int].toString,
  classOf[Int].toString,
  null,
  null,
  new java.util.HashMap(), )

// Test that a file was created for each key
(1 to 5).foreach(key = {
  val testPath = new Path(fullPath + / + key)
  assert(fs.exists(testPath))

  // Read the file and test that the contents are the values matching that 
key split by line
  val input = fs.open(testPath)
  val reader = new BufferedReader(new InputStreamReader(input))
  val values = new HashSet[Int]
  val lines = Stream.continually(reader.readLine()).takeWhile(_ != null)
  lines.foreach(s = values += s.toInt)

  assert(values.contains(key*key))
})

fs.delete(new Path(fullPath), true)
  }

{code}

 Add saveAsTextFileByKey method for PySpark
 --

 Key: SPARK-6780
 URL: https://issues.apache.org/jira/browse/SPARK-6780
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Reporter: Ilya Ganelin

 The PySpark API should have a method to allow saving a key-value RDD to 
 subdirectories organized by key as in :
 https://issues.apache.org/jira/browse/SPARK-3533



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6746) Refactor large functions in DAGScheduler to improve readibility

2015-04-07 Thread Ilya Ganelin (JIRA)
Ilya Ganelin created SPARK-6746:
---

 Summary: Refactor large functions in DAGScheduler to improve 
readibility
 Key: SPARK-6746
 URL: https://issues.apache.org/jira/browse/SPARK-6746
 Project: Spark
  Issue Type: Sub-task
Reporter: Ilya Ganelin


The DAGScheduler class contains two huge functions that make it 
very hard to understand what's going on in the code. These are:

1) The monolithic handleTaskCompletion 
2) The cleanupStateForJobAndIndependentStages function

These can be simply modularized to eliminate some awkward type casting and 
improve code readability. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6746) Refactor large functions in DAGScheduler to improve readibility

2015-04-07 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14483561#comment-14483561
 ] 

Ilya Ganelin commented on SPARK-6746:
-

SPARK-5945 requires updating the logic for handling fetch failures. This will 
introduce merge conflicts with this patch so this one should be merged first 
since it cleans up the DAG Scheduler code to help understand what's going on.

 Refactor large functions in DAGScheduler to improve readibility
 ---

 Key: SPARK-6746
 URL: https://issues.apache.org/jira/browse/SPARK-6746
 Project: Spark
  Issue Type: Sub-task
  Components: Scheduler
Reporter: Ilya Ganelin

 The DAGScheduler class contains two huge functions that make it 
 very hard to understand what's going on in the code. These are:
 1) The monolithic handleTaskCompletion 
 2) The cleanupStateForJobAndIndependentStages function
 These can be simply modularized to eliminate some awkward type casting and 
 improve code readability. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.

2015-03-30 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6616:

Description: 
There are numerous instances throughout the code base of the following:

{{code}}
if (!stopped) {
stopped = true
...
}
{{code}}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop() {{code}}.

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop() {{code}}.



  was:
There are numerous instances throughout the code base of the following:

{{code}}
if (!stopped) {
stopped = true
...
}
{{code}}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop().{{code}}

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop().{{code}}




 IsStopped set to true in before stop() is complete.
 ---

 Key: SPARK-6616
 URL: https://issues.apache.org/jira/browse/SPARK-6616
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: Ilya Ganelin

 There are numerous instances throughout the code base of the following:
 {{code}}
 if (!stopped) {
 stopped = true
 ...
 }
 {{code}}
 In general, this is bad practice since it can cause an incomplete cleanup if 
 there is an error during shutdown and not all code executes. Incomplete 
 cleanup is harder to track down than a double cleanup that triggers some 
 error. I propose fixing this throughout the code, starting with the cleanup 
 sequence with {{code}}SparkContext.stop() {{code}}.
 A cursory examination reveals this in {{code}}SparkContext.stop(), 
 SparkEnv.stop(), and ContextCleaner.stop() {{code}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.

2015-03-30 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6616:

Description: 
There are numerous instances throughout the code base of the following:

{{code}}
if (!stopped) {
stopped = true
...
}
{{code}}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop().{{code}}

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop().{{code}}



  was:
There are numerous instances throughout the code base of the following:

{{code}}
if (!stopped) {
stopped = true
...
}
{{code}}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop()```.{{code}}

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop().{{code}}




 IsStopped set to true in before stop() is complete.
 ---

 Key: SPARK-6616
 URL: https://issues.apache.org/jira/browse/SPARK-6616
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: Ilya Ganelin

 There are numerous instances throughout the code base of the following:
 {{code}}
 if (!stopped) {
 stopped = true
 ...
 }
 {{code}}
 In general, this is bad practice since it can cause an incomplete cleanup if 
 there is an error during shutdown and not all code executes. Incomplete 
 cleanup is harder to track down than a double cleanup that triggers some 
 error. I propose fixing this throughout the code, starting with the cleanup 
 sequence with {{code}}SparkContext.stop().{{code}}
 A cursory examination reveals this in {{code}}SparkContext.stop(), 
 SparkEnv.stop(), and ContextCleaner.stop().{{code}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6492) SparkContext.stop() can deadlock when DAGSchedulerEventProcessLoop dies

2015-03-30 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387499#comment-14387499
 ] 

Ilya Ganelin commented on SPARK-6492:
-

Would it be reasonable to fix this by adding some timeout/retry logic in the 
DAGScheduler shutdown code? If so, I can take care of this. Thanks. 

 SparkContext.stop() can deadlock when DAGSchedulerEventProcessLoop dies
 ---

 Key: SPARK-6492
 URL: https://issues.apache.org/jira/browse/SPARK-6492
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.3.0, 1.4.0
Reporter: Josh Rosen
Priority: Critical

 A deadlock can occur when DAGScheduler death causes a SparkContext to be shut 
 down while user code is concurrently racing to stop the SparkContext in a 
 finally block.
 For example:
 {code}
 try {
   sc = new SparkContext(local, test)
   // start running a job that causes the DAGSchedulerEventProcessor to 
 crash
   someRDD.doStuff()
 }
 } finally {
   sc.stop() // stop the sparkcontext once the failure in DAGScheduler causes 
 the above job to fail with an exception
 }
 {code}
 This leads to a deadlock.  The event processor thread tries to lock on the 
 {{SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK}} and becomes blocked because 
 the thread that holds that lock is waiting for the event processor thread to 
 join:
 {code}
 dag-scheduler-event-loop daemon prio=5 tid=0x7ffa69456000 nid=0x9403 
 waiting for monitor entry [0x0001223ad000]
java.lang.Thread.State: BLOCKED (on object monitor)
   at org.apache.spark.SparkContext.stop(SparkContext.scala:1398)
   - waiting to lock 0x0007f5037b08 (a java.lang.Object)
   at 
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onError(DAGScheduler.scala:1412)
   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:52)
 {code}
 {code}
 pool-1-thread-1-ScalaTest-running-SparkContextSuite prio=5 
 tid=0x7ffa69864800 nid=0x5903 in Object.wait() [0x0001202dc000]
java.lang.Thread.State: WAITING (on object monitor)
   at java.lang.Object.wait(Native Method)
   - waiting on 0x0007f4b28000 (a 
 org.apache.spark.util.EventLoop$$anon$1)
   at java.lang.Thread.join(Thread.java:1281)
   - locked 0x0007f4b28000 (a 
 org.apache.spark.util.EventLoop$$anon$1)
   at java.lang.Thread.join(Thread.java:1355)
   at org.apache.spark.util.EventLoop.stop(EventLoop.scala:79)
   at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1352)
   at org.apache.spark.SparkContext.stop(SparkContext.scala:1405)
   - locked 0x0007f5037b08 (a java.lang.Object)
 [...]
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-6492) SparkContext.stop() can deadlock when DAGSchedulerEventProcessLoop dies

2015-03-30 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387499#comment-14387499
 ] 

Ilya Ganelin edited comment on SPARK-6492 at 3/30/15 10:03 PM:
---

Would it be reasonable to fix this by adding some timeout/retry logic in the 
SparkContext shutdown code? If so, I can take care of this. Thanks. 


was (Author: ilganeli):
Would it be reasonable to fix this by adding some timeout/retry logic in the 
DAGScheduler shutdown code? If so, I can take care of this. Thanks. 

 SparkContext.stop() can deadlock when DAGSchedulerEventProcessLoop dies
 ---

 Key: SPARK-6492
 URL: https://issues.apache.org/jira/browse/SPARK-6492
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.3.0, 1.4.0
Reporter: Josh Rosen
Priority: Critical

 A deadlock can occur when DAGScheduler death causes a SparkContext to be shut 
 down while user code is concurrently racing to stop the SparkContext in a 
 finally block.
 For example:
 {code}
 try {
   sc = new SparkContext(local, test)
   // start running a job that causes the DAGSchedulerEventProcessor to 
 crash
   someRDD.doStuff()
 }
 } finally {
   sc.stop() // stop the sparkcontext once the failure in DAGScheduler causes 
 the above job to fail with an exception
 }
 {code}
 This leads to a deadlock.  The event processor thread tries to lock on the 
 {{SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK}} and becomes blocked because 
 the thread that holds that lock is waiting for the event processor thread to 
 join:
 {code}
 dag-scheduler-event-loop daemon prio=5 tid=0x7ffa69456000 nid=0x9403 
 waiting for monitor entry [0x0001223ad000]
java.lang.Thread.State: BLOCKED (on object monitor)
   at org.apache.spark.SparkContext.stop(SparkContext.scala:1398)
   - waiting to lock 0x0007f5037b08 (a java.lang.Object)
   at 
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onError(DAGScheduler.scala:1412)
   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:52)
 {code}
 {code}
 pool-1-thread-1-ScalaTest-running-SparkContextSuite prio=5 
 tid=0x7ffa69864800 nid=0x5903 in Object.wait() [0x0001202dc000]
java.lang.Thread.State: WAITING (on object monitor)
   at java.lang.Object.wait(Native Method)
   - waiting on 0x0007f4b28000 (a 
 org.apache.spark.util.EventLoop$$anon$1)
   at java.lang.Thread.join(Thread.java:1281)
   - locked 0x0007f4b28000 (a 
 org.apache.spark.util.EventLoop$$anon$1)
   at java.lang.Thread.join(Thread.java:1355)
   at org.apache.spark.util.EventLoop.stop(EventLoop.scala:79)
   at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1352)
   at org.apache.spark.SparkContext.stop(SparkContext.scala:1405)
   - locked 0x0007f5037b08 (a java.lang.Object)
 [...]
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.

2015-03-30 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6616:

Description: 
There are numerous instances throughout the code base of the following:

{code}
if (!stopped) {
stopped = true
...
}
{code}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {code}SparkContext.stop() {code}.

A cursory examination reveals this in {code}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop() {code}.



  was:
There are numerous instances throughout the code base of the following:

{code}
if (!stopped) {
stopped = true
...
}
{code}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop() {{code}}.

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop() {{code}}.




 IsStopped set to true in before stop() is complete.
 ---

 Key: SPARK-6616
 URL: https://issues.apache.org/jira/browse/SPARK-6616
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: Ilya Ganelin

 There are numerous instances throughout the code base of the following:
 {code}
 if (!stopped) {
 stopped = true
 ...
 }
 {code}
 In general, this is bad practice since it can cause an incomplete cleanup if 
 there is an error during shutdown and not all code executes. Incomplete 
 cleanup is harder to track down than a double cleanup that triggers some 
 error. I propose fixing this throughout the code, starting with the cleanup 
 sequence with {code}SparkContext.stop() {code}.
 A cursory examination reveals this in {code}SparkContext.stop(), 
 SparkEnv.stop(), and ContextCleaner.stop() {code}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.

2015-03-30 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6616:

Description: 
There are numerous instances throughout the code base of the following:

{code}
if (!stopped) {
stopped = true
...
}
{code}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {code}SparkContext.stop() {code}

A cursory examination reveals this in {code}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop() {code}



  was:
There are numerous instances throughout the code base of the following:

{code}
if (!stopped) {
stopped = true
...
}
{code}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {code}SparkContext.stop() {code}.

A cursory examination reveals this in {code}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop() {code}.




 IsStopped set to true in before stop() is complete.
 ---

 Key: SPARK-6616
 URL: https://issues.apache.org/jira/browse/SPARK-6616
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: Ilya Ganelin

 There are numerous instances throughout the code base of the following:
 {code}
 if (!stopped) {
 stopped = true
 ...
 }
 {code}
 In general, this is bad practice since it can cause an incomplete cleanup if 
 there is an error during shutdown and not all code executes. Incomplete 
 cleanup is harder to track down than a double cleanup that triggers some 
 error. I propose fixing this throughout the code, starting with the cleanup 
 sequence with {code}SparkContext.stop() {code}
 A cursory examination reveals this in {code}SparkContext.stop(), 
 SparkEnv.stop(), and ContextCleaner.stop() {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.

2015-03-30 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6616:

Description: 
There are numerous instances throughout the code base of the following:

{code}
if (!stopped) {
stopped = true
...
}
{code}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop() {{code}}.

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop() {{code}}.



  was:
There are numerous instances throughout the code base of the following:

{{code}}
if (!stopped) {
stopped = true
...
}
{{code}}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop() {{code}}.

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop() {{code}}.




 IsStopped set to true in before stop() is complete.
 ---

 Key: SPARK-6616
 URL: https://issues.apache.org/jira/browse/SPARK-6616
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: Ilya Ganelin

 There are numerous instances throughout the code base of the following:
 {code}
 if (!stopped) {
 stopped = true
 ...
 }
 {code}
 In general, this is bad practice since it can cause an incomplete cleanup if 
 there is an error during shutdown and not all code executes. Incomplete 
 cleanup is harder to track down than a double cleanup that triggers some 
 error. I propose fixing this throughout the code, starting with the cleanup 
 sequence with {{code}}SparkContext.stop() {{code}}.
 A cursory examination reveals this in {{code}}SparkContext.stop(), 
 SparkEnv.stop(), and ContextCleaner.stop() {{code}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6616) IsStopped set to true in before stop() is complete.

2015-03-30 Thread Ilya Ganelin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ilya Ganelin updated SPARK-6616:

Description: 
There are numerous instances throughout the code base of the following:

{{code}}
if (!stopped) {
stopped = true
...
}
{{code}}

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with {{code}}SparkContext.stop()```.{{code}}

A cursory examination reveals this in {{code}}SparkContext.stop(), 
SparkEnv.stop(), and ContextCleaner.stop().{{code}}



  was:
There are numerous instances throughout the code base of the following:

```
if (!stopped) {
stopped = true
...
}
```

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with ```SparkContext.stop()```.

A cursory examination reveals this in ```SparkContext.stop(), SparkEnv.stop(), 
and ContextCleaner.stop()```.




 IsStopped set to true in before stop() is complete.
 ---

 Key: SPARK-6616
 URL: https://issues.apache.org/jira/browse/SPARK-6616
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: Ilya Ganelin

 There are numerous instances throughout the code base of the following:
 {{code}}
 if (!stopped) {
 stopped = true
 ...
 }
 {{code}}
 In general, this is bad practice since it can cause an incomplete cleanup if 
 there is an error during shutdown and not all code executes. Incomplete 
 cleanup is harder to track down than a double cleanup that triggers some 
 error. I propose fixing this throughout the code, starting with the cleanup 
 sequence with {{code}}SparkContext.stop()```.{{code}}
 A cursory examination reveals this in {{code}}SparkContext.stop(), 
 SparkEnv.stop(), and ContextCleaner.stop().{{code}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6616) IsStopped set to true in before stop() is complete.

2015-03-30 Thread Ilya Ganelin (JIRA)
Ilya Ganelin created SPARK-6616:
---

 Summary: IsStopped set to true in before stop() is complete.
 Key: SPARK-6616
 URL: https://issues.apache.org/jira/browse/SPARK-6616
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: Ilya Ganelin


There are numerous instances throughout the code base of the following:

```
if (!stopped) {
stopped = true
...
}
```

In general, this is bad practice since it can cause an incomplete cleanup if 
there is an error during shutdown and not all code executes. Incomplete cleanup 
is harder to track down than a double cleanup that triggers some error. I 
propose fixing this throughout the code, starting with the cleanup sequence 
with ```SparkContext.stop()```.

A cursory examination reveals this in ```SparkContext.stop(), SparkEnv.stop(), 
and ContextCleaner.stop()```.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5945) Spark should not retry a stage infinitely on a FetchFailedException

2015-03-18 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367891#comment-14367891
 ] 

Ilya Ganelin commented on SPARK-5945:
-

Hi Imran - I'd be happy to tackle this. Could you please assign it to me? Thank 
you. 

 Spark should not retry a stage infinitely on a FetchFailedException
 ---

 Key: SPARK-5945
 URL: https://issues.apache.org/jira/browse/SPARK-5945
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: Imran Rashid

 While investigating SPARK-5928, I noticed some very strange behavior in the 
 way spark retries stages after a FetchFailedException.  It seems that on a 
 FetchFailedException, instead of simply killing the task and retrying, Spark 
 aborts the stage and retries.  If it just retried the task, the task might 
 fail 4 times and then trigger the usual job killing mechanism.  But by 
 killing the stage instead, the max retry logic is skipped (it looks to me 
 like there is no limit for retries on a stage).
 After a bit of discussion with Kay Ousterhout, it seems the idea is that if a 
 fetch fails, we assume that the block manager we are fetching from has 
 failed, and that it will succeed if we retry the stage w/out that block 
 manager.  In that case, it wouldn't make any sense to retry the task, since 
 its doomed to fail every time, so we might as well kill the whole stage.  But 
 this raises two questions:
 1) Is it really safe to assume that a FetchFailedException means that the 
 BlockManager has failed, and ti will work if we just try another one?  
 SPARK-5928 shows that there are at least some cases where that assumption is 
 wrong.  Even if we fix that case, this logic seems brittle to the next case 
 we find.  I guess the idea is that this behavior is what gives us the R in 
 RDD ... but it seems like its not really that robust and maybe should be 
 reconsidered.
 2) Should stages only be retried a limited number of times?  It would be 
 pretty easy to put in a limited number of retries per stage.  Though again, 
 we encounter issues with keeping things resilient.  Theoretically one stage 
 could have many retries, but due to failures in different stages further 
 downstream, so we might need to track the cause of each retry as well to 
 still have the desired behavior.
 In general it just seems there is some flakiness in the retry logic.  This is 
 the only reproducible example I have at the moment, but I vaguely recall 
 hitting other cases of strange behavior w/ retries when trying to run long 
 pipelines.  Eg., if one executor is stuck in a GC during a fetch, the fetch 
 fails, but the executor eventually comes back and the stage gets retried 
 again, but the same GC issues happen the second time around, etc.
 Copied from SPARK-5928, here's the example program that can regularly produce 
 a loop of stage failures.  Note that it will only fail from a remote fetch, 
 so it can't be run locally -- I ran with {{MASTER=yarn-client spark-shell 
 --num-executors 2 --executor-memory 4000m}}
 {code}
 val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =
   val n = 3e3.toInt
   val arr = new Array[Byte](n)
   //need to make sure the array doesn't compress to something small
   scala.util.Random.nextBytes(arr)
   arr
 }
 rdd.map { x = (1, x)}.groupByKey().count()
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5932) Use consistent naming for byte properties

2015-03-18 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367919#comment-14367919
 ] 

Ilya Ganelin commented on SPARK-5932:
-

[~andrewor14] - I can take this out. Thanks.

 Use consistent naming for byte properties
 -

 Key: SPARK-5932
 URL: https://issues.apache.org/jira/browse/SPARK-5932
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Andrew Or
Assignee: Andrew Or

 This is SPARK-5931's sister issue.
 The naming of existing byte configs is inconsistent. We currently have the 
 following throughout the code base:
 {code}
 spark.reducer.maxMbInFlight // megabytes
 spark.kryoserializer.buffer.mb // megabytes
 spark.shuffle.file.buffer.kb // kilobytes
 spark.broadcast.blockSize // kilobytes
 spark.executor.logs.rolling.size.maxBytes // bytes
 spark.io.compression.snappy.block.size // bytes
 {code}
 Instead, my proposal is to simplify the config name itself and make 
 everything accept time using the following format: 500b, 2k, 100m, 46g, 
 similar to what we currently use for our memory settings. For instance:
 {code}
 spark.reducer.maxSizeInFlight = 10m
 spark.kryoserializer.buffer = 2m
 spark.shuffle.file.buffer = 10k
 spark.broadcast.blockSize = 20k
 spark.executor.logs.rolling.maxSize = 500b
 spark.io.compression.snappy.blockSize = 200b
 {code}
 All existing configs that are relevant will be deprecated in favor of the new 
 ones. We should do this soon before we keep introducing more time configs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-5931) Use consistent naming for time properties

2015-03-18 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367915#comment-14367915
 ] 

Ilya Ganelin edited comment on SPARK-5931 at 3/18/15 9:09 PM:
--

[~andrewor14] - I can take this out. Thanks.


was (Author: ilganeli):
@andrewor - I can take this out. Thanks.

 Use consistent naming for time properties
 -

 Key: SPARK-5931
 URL: https://issues.apache.org/jira/browse/SPARK-5931
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Andrew Or
Assignee: Andrew Or

 This is SPARK-5932's sister issue.
 The naming of existing time configs is inconsistent. We currently have the 
 following throughout the code base:
 {code}
 spark.network.timeout // seconds
 spark.executor.heartbeatInterval // milliseconds
 spark.storage.blockManagerSlaveTimeoutMs // milliseconds
 spark.yarn.scheduler.heartbeat.interval-ms // milliseconds
 {code}
 Instead, my proposal is to simplify the config name itself and make 
 everything accept time using the following format: 5s, 2ms, 100us. For 
 instance:
 {code}
 spark.network.timeout = 5s
 spark.executor.heartbeatInterval = 500ms
 spark.storage.blockManagerSlaveTimeout = 100ms
 spark.yarn.scheduler.heartbeatInterval = 400ms
 {code}
 All existing configs that are relevant will be deprecated in favor of the new 
 ones. We should do this soon before we keep introducing more time configs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5931) Use consistent naming for time properties

2015-03-18 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367915#comment-14367915
 ] 

Ilya Ganelin commented on SPARK-5931:
-

@andrewor - I can take this out. Thanks.

 Use consistent naming for time properties
 -

 Key: SPARK-5931
 URL: https://issues.apache.org/jira/browse/SPARK-5931
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Andrew Or
Assignee: Andrew Or

 This is SPARK-5932's sister issue.
 The naming of existing time configs is inconsistent. We currently have the 
 following throughout the code base:
 {code}
 spark.network.timeout // seconds
 spark.executor.heartbeatInterval // milliseconds
 spark.storage.blockManagerSlaveTimeoutMs // milliseconds
 spark.yarn.scheduler.heartbeat.interval-ms // milliseconds
 {code}
 Instead, my proposal is to simplify the config name itself and make 
 everything accept time using the following format: 5s, 2ms, 100us. For 
 instance:
 {code}
 spark.network.timeout = 5s
 spark.executor.heartbeatInterval = 500ms
 spark.storage.blockManagerSlaveTimeout = 100ms
 spark.yarn.scheduler.heartbeatInterval = 400ms
 {code}
 All existing configs that are relevant will be deprecated in favor of the new 
 ones. We should do this soon before we keep introducing more time configs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4927) Spark does not clean up properly during long jobs.

2015-03-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14358958#comment-14358958
 ] 

Ilya Ganelin commented on SPARK-4927:
-

Hi Sean - I have a code snippet that reproduced this. Let me send it to you in 
a bit - I don't have the means to run 1.3 in a cluster.



Sent with Good (www.good.com)




 Spark does not clean up properly during long jobs. 
 ---

 Key: SPARK-4927
 URL: https://issues.apache.org/jira/browse/SPARK-4927
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Ilya Ganelin

 On a long running Spark job, Spark will eventually run out of memory on the 
 driver node due to metadata overhead from the shuffle operation. Spark will 
 continue to operate, however with drastically decreased performance (since 
 swapping now occurs with every operation).
 The spark.cleanup.tll parameter allows a user to configure when cleanup 
 happens but the issue with doing this is that it isn’t done safely, e.g. If 
 this clears a cached RDD or active task in the middle of processing a stage, 
 this ultimately causes a KeyNotFoundException when the next stage attempts to 
 reference the cleared RDD or task.
 There should be a sustainable mechanism for cleaning up stale metadata that 
 allows the program to continue running. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4927) Spark does not clean up properly during long jobs.

2015-03-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14359294#comment-14359294
 ] 

Ilya Ganelin commented on SPARK-4927:
-

Are you running over yarn? My theory is that the memory usage has to do with 
data movement between nodes.



Sent with Good (www.good.com)




 Spark does not clean up properly during long jobs. 
 ---

 Key: SPARK-4927
 URL: https://issues.apache.org/jira/browse/SPARK-4927
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Ilya Ganelin

 On a long running Spark job, Spark will eventually run out of memory on the 
 driver node due to metadata overhead from the shuffle operation. Spark will 
 continue to operate, however with drastically decreased performance (since 
 swapping now occurs with every operation).
 The spark.cleanup.tll parameter allows a user to configure when cleanup 
 happens but the issue with doing this is that it isn’t done safely, e.g. If 
 this clears a cached RDD or active task in the middle of processing a stage, 
 this ultimately causes a KeyNotFoundException when the next stage attempts to 
 reference the cleared RDD or task.
 There should be a sustainable mechanism for cleaning up stale metadata that 
 allows the program to continue running. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4927) Spark does not clean up properly during long jobs.

2015-03-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14358958#comment-14358958
 ] 

Ilya Ganelin edited comment on SPARK-4927 at 3/12/15 6:50 PM:
--

Hi Sean - I have a code snippet that reproduced this. Let me send it to you in 
a bit - I don't have the means to run 1.3 in a cluster.

Realized that I already had that code snippet posted. Running the above code 
doesn't reproduce the issue?



was (Author: ilganeli):
Hi Sean - I have a code snippet that reproduced this. Let me send it to you in 
a bit - I don't have the means to run 1.3 in a cluster.



Sent with Good (www.good.com)




 Spark does not clean up properly during long jobs. 
 ---

 Key: SPARK-4927
 URL: https://issues.apache.org/jira/browse/SPARK-4927
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Ilya Ganelin

 On a long running Spark job, Spark will eventually run out of memory on the 
 driver node due to metadata overhead from the shuffle operation. Spark will 
 continue to operate, however with drastically decreased performance (since 
 swapping now occurs with every operation).
 The spark.cleanup.tll parameter allows a user to configure when cleanup 
 happens but the issue with doing this is that it isn’t done safely, e.g. If 
 this clears a cached RDD or active task in the middle of processing a stage, 
 this ultimately causes a KeyNotFoundException when the next stage attempts to 
 reference the cleared RDD or task.
 There should be a sustainable mechanism for cleaning up stale metadata that 
 allows the program to continue running. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs

2015-03-04 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14347216#comment-14347216
 ] 

Ilya Ganelin commented on SPARK-3533:
-

[~aaronjosephs] - Let me see if that's it. Thanks!

 Add saveAsTextFileByKey() method to RDDs
 

 Key: SPARK-3533
 URL: https://issues.apache.org/jira/browse/SPARK-3533
 Project: Spark
  Issue Type: Improvement
  Components: PySpark, Spark Core
Affects Versions: 1.1.0
Reporter: Nicholas Chammas

 Users often have a single RDD of key-value pairs that they want to save to 
 multiple locations based on the keys.
 For example, say I have an RDD like this:
 {code}
  a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', 
  'Frankie']).keyBy(lambda x: x[0])
  a.collect()
 [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')]
  a.keys().distinct().collect()
 ['B', 'F', 'N']
 {code}
 Now I want to write the RDD out to different paths depending on the keys, so 
 that I have one output directory per distinct key. Each output directory 
 could potentially have multiple {{part-}} files, one per RDD partition.
 So the output would look something like:
 {code}
 /path/prefix/B [/part-1, /part-2, etc]
 /path/prefix/F [/part-1, /part-2, etc]
 /path/prefix/N [/part-1, /part-2, etc]
 {code}
 Though it may be possible to do this with some combination of 
 {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the 
 {{MultipleTextOutputFormat}} output format class, it isn't straightforward. 
 It's not clear if it's even possible at all in PySpark.
 Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs 
 that makes it easy to save RDDs out to multiple locations at once.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs

2015-03-02 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14343387#comment-14343387
 ] 

Ilya Ganelin commented on SPARK-3533:
-

Hey [~aaronjosephs], please feel free. I'm out of ideas for this one. You can 
see my code changes at the github link and the issue I ran into here. Thanks. 

 Add saveAsTextFileByKey() method to RDDs
 

 Key: SPARK-3533
 URL: https://issues.apache.org/jira/browse/SPARK-3533
 Project: Spark
  Issue Type: Improvement
  Components: PySpark, Spark Core
Affects Versions: 1.1.0
Reporter: Nicholas Chammas

 Users often have a single RDD of key-value pairs that they want to save to 
 multiple locations based on the keys.
 For example, say I have an RDD like this:
 {code}
  a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', 
  'Frankie']).keyBy(lambda x: x[0])
  a.collect()
 [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')]
  a.keys().distinct().collect()
 ['B', 'F', 'N']
 {code}
 Now I want to write the RDD out to different paths depending on the keys, so 
 that I have one output directory per distinct key. Each output directory 
 could potentially have multiple {{part-}} files, one per RDD partition.
 So the output would look something like:
 {code}
 /path/prefix/B [/part-1, /part-2, etc]
 /path/prefix/F [/part-1, /part-2, etc]
 /path/prefix/N [/part-1, /part-2, etc]
 {code}
 Though it may be possible to do this with some combination of 
 {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the 
 {{MultipleTextOutputFormat}} output format class, it isn't straightforward. 
 It's not clear if it's even possible at all in PySpark.
 Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs 
 that makes it easy to save RDDs out to multiple locations at once.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5845) Time to cleanup spilled shuffle files not included in shuffle write time

2015-02-26 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14339794#comment-14339794
 ] 

Ilya Ganelin commented on SPARK-5845:
-

I'm code complete on this, will submit a PR shortly.

 Time to cleanup spilled shuffle files not included in shuffle write time
 

 Key: SPARK-5845
 URL: https://issues.apache.org/jira/browse/SPARK-5845
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.3.0, 1.2.1
Reporter: Kay Ousterhout
Assignee: Ilya Ganelin
Priority: Minor

 When the disk is contended, I've observed cases when it takes as long as 7 
 seconds to clean up all of the intermediate spill files for a shuffle (when 
 using the sort based shuffle, but bypassing merging because there are =200 
 shuffle partitions).  This is even when the shuffle data is non-huge (152MB 
 written from one of the tasks where I observed this).  This is effectively 
 part of the shuffle write time (because it's a necessary side effect of 
 writing data to disk) so should be added to the shuffle write time to 
 facilitate debugging.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5750) Document that ordering of elements in shuffled partitions is not deterministic across runs

2015-02-25 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14336795#comment-14336795
 ] 

Ilya Ganelin commented on SPARK-5750:
-

Did you have a particular doc in mind to update? I feel like this sort of 
comment should go in the programming guide but there's not really a good spot 
for it. One glaring omission in the guide is a general writeup of the shuffle 
operation and the role that it plays internally. Understanding shuffles is key 
to writing stable Spark applications yet there isn't really any mention of it 
outside of the tech talks and presentations from the Spark folks. My suggestion 
would be to create a section providing an overview of shuffle, what parameters 
influence its behavior and stability, and then add this comment to that 
section. 

 Document that ordering of elements in shuffled partitions is not 
 deterministic across runs
 --

 Key: SPARK-5750
 URL: https://issues.apache.org/jira/browse/SPARK-5750
 Project: Spark
  Issue Type: Improvement
  Components: Documentation
Reporter: Josh Rosen

 The ordering of elements in shuffled partitions is not deterministic across 
 runs.  For instance, consider the following example:
 {code}
 val largeFiles = sc.textFile(...)
 val airlines = largeFiles.repartition(2000).cache()
 println(airlines.first)
 {code}
 If this code is run twice, then each run will output a different result.  
 There is non-determinism in the shuffle read code that accounts for this:
 Spark's shuffle read path processes blocks as soon as they are fetched  Spark 
 uses 
 [ShuffleBlockFetcherIterator|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala]
  to fetch shuffle data from mappers.  In this code, requests for multiple 
 blocks from the same host are batched together, so nondeterminism in where 
 tasks are run means that the set of requests can vary across runs.  In 
 addition, there's an [explicit 
 call|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L256]
  to randomize the order of the batched fetch requests.  As a result, shuffle 
 operations cannot be guaranteed to produce the same ordering of the elements 
 in their partitions.
 Therefore, Spark should update its docs to clarify that the ordering of 
 elements in shuffle RDDs' partitions is non-deterministic.  Note, however, 
 that the _set_ of elements in each partition will be deterministic: if we 
 used {{mapPartitions}} to sort each partition, then the {{first()}} call 
 above would produce a deterministic result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5750) Document that ordering of elements in shuffled partitions is not deterministic across runs

2015-02-25 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14337104#comment-14337104
 ] 

Ilya Ganelin commented on SPARK-5750:
-

I'd be happy to pull those in. Is it fine to submit the PR against this issue?

 Document that ordering of elements in shuffled partitions is not 
 deterministic across runs
 --

 Key: SPARK-5750
 URL: https://issues.apache.org/jira/browse/SPARK-5750
 Project: Spark
  Issue Type: Improvement
  Components: Documentation
Reporter: Josh Rosen

 The ordering of elements in shuffled partitions is not deterministic across 
 runs.  For instance, consider the following example:
 {code}
 val largeFiles = sc.textFile(...)
 val airlines = largeFiles.repartition(2000).cache()
 println(airlines.first)
 {code}
 If this code is run twice, then each run will output a different result.  
 There is non-determinism in the shuffle read code that accounts for this:
 Spark's shuffle read path processes blocks as soon as they are fetched  Spark 
 uses 
 [ShuffleBlockFetcherIterator|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala]
  to fetch shuffle data from mappers.  In this code, requests for multiple 
 blocks from the same host are batched together, so nondeterminism in where 
 tasks are run means that the set of requests can vary across runs.  In 
 addition, there's an [explicit 
 call|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L256]
  to randomize the order of the batched fetch requests.  As a result, shuffle 
 operations cannot be guaranteed to produce the same ordering of the elements 
 in their partitions.
 Therefore, Spark should update its docs to clarify that the ordering of 
 elements in shuffle RDDs' partitions is non-deterministic.  Note, however, 
 that the _set_ of elements in each partition will be deterministic: if we 
 used {{mapPartitions}} to sort each partition, then the {{first()}} call 
 above would produce a deterministic result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5845) Time to cleanup intermediate shuffle files not included in shuffle write time

2015-02-24 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14335646#comment-14335646
 ] 

Ilya Ganelin commented on SPARK-5845:
-

If I understand correctly, the file cleanup happens in 
IndexShuffleBlockManager:::removeDataByMap(), which is called from either the 
SortShuffleManager or the SortShuffleWriter. The problem is that these classes 
do not have any knowledge of the currently collected metrics. Furthermore, the 
disk cleanup is, unless configured in the SparkConf, triggered asynchronously 
via the RemoveShuffle() message so there doesn't appear to be a straightforward 
way to provide a set of metrics to be updated. 

Do you have any suggestions for getting around this? Please let me know, thank 
you. 

 Time to cleanup intermediate shuffle files not included in shuffle write time
 -

 Key: SPARK-5845
 URL: https://issues.apache.org/jira/browse/SPARK-5845
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.3.0, 1.2.1
Reporter: Kay Ousterhout
Assignee: Ilya Ganelin
Priority: Minor

 When the disk is contended, I've observed cases when it takes as long as 7 
 seconds to clean up all of the intermediate spill files for a shuffle (when 
 using the sort based shuffle, but bypassing merging because there are =200 
 shuffle partitions).  This is even when the shuffle data is non-huge (152MB 
 written from one of the tasks where I observed this).  This is effectively 
 part of the shuffle write time (because it's a necessary side effect of 
 writing data to disk) so should be added to the shuffle write time to 
 facilitate debugging.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-5845) Time to cleanup intermediate shuffle files not included in shuffle write time

2015-02-24 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14335646#comment-14335646
 ] 

Ilya Ganelin edited comment on SPARK-5845 at 2/24/15 11:19 PM:
---

If I understand correctly, the file cleanup happens in 
IndexShuffleBlockManager:::removeDataByMap(), which is called from either the 
SortShuffleManager or the SortShuffleWriter. The problem is that these classes 
do not have any knowledge of the currently collected metrics. Furthermore, the 
disk cleanup is, unless configured in the SparkConf, triggered asynchronously 
via the RemoveShuffle message so there doesn't appear to be a straightforward 
way to provide a set of metrics to be updated. 

Do you have any suggestions for getting around this? Please let me know, thank 
you. 


was (Author: ilganeli):
If I understand correctly, the file cleanup happens in 
IndexShuffleBlockManager:::removeDataByMap(), which is called from either the 
SortShuffleManager or the SortShuffleWriter. The problem is that these classes 
do not have any knowledge of the currently collected metrics. Furthermore, the 
disk cleanup is, unless configured in the SparkConf, triggered asynchronously 
via the RemoveShuffle() message so there doesn't appear to be a straightforward 
way to provide a set of metrics to be updated. 

Do you have any suggestions for getting around this? Please let me know, thank 
you. 

 Time to cleanup intermediate shuffle files not included in shuffle write time
 -

 Key: SPARK-5845
 URL: https://issues.apache.org/jira/browse/SPARK-5845
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.3.0, 1.2.1
Reporter: Kay Ousterhout
Assignee: Ilya Ganelin
Priority: Minor

 When the disk is contended, I've observed cases when it takes as long as 7 
 seconds to clean up all of the intermediate spill files for a shuffle (when 
 using the sort based shuffle, but bypassing merging because there are =200 
 shuffle partitions).  This is even when the shuffle data is non-huge (152MB 
 written from one of the tasks where I observed this).  This is effectively 
 part of the shuffle write time (because it's a necessary side effect of 
 writing data to disk) so should be added to the shuffle write time to 
 facilitate debugging.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5845) Time to cleanup spilled shuffle files not included in shuffle write time

2015-02-24 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14335751#comment-14335751
 ] 

Ilya Ganelin commented on SPARK-5845:
-

My mistake - missed your comment about the spill files in the detailed 
description. Given that we're interested in cleaning up the spill files which 
appear to be cleaned up in ExternalSorter.stop() (please correct me if I'm 
wrong), I would like to either 

a) Pass the context to the stop() method - this is possible since the 
SortShuffleWriter has visibility of the TaskContext (which in turn stores the 
metrics we're interested in).

b) (My preference since it won't break the existing interface) Surround 
sorter.stop() on line 91 of SortShuffleWriter.scala with a timer. The only 
downside to this second approach is that it will also include the cleanup of 
the partition writers. I'm not sure whether that should be included in this 
time computation. 

 Time to cleanup spilled shuffle files not included in shuffle write time
 

 Key: SPARK-5845
 URL: https://issues.apache.org/jira/browse/SPARK-5845
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.3.0, 1.2.1
Reporter: Kay Ousterhout
Assignee: Ilya Ganelin
Priority: Minor

 When the disk is contended, I've observed cases when it takes as long as 7 
 seconds to clean up all of the intermediate spill files for a shuffle (when 
 using the sort based shuffle, but bypassing merging because there are =200 
 shuffle partitions).  This is even when the shuffle data is non-huge (152MB 
 written from one of the tasks where I observed this).  This is effectively 
 part of the shuffle write time (because it's a necessary side effect of 
 writing data to disk) so should be added to the shuffle write time to 
 facilitate debugging.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5845) Time to cleanup intermediate shuffle files not included in shuffle write time

2015-02-23 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14333691#comment-14333691
 ] 

Ilya Ganelin commented on SPARK-5845:
-

Hi Kay - I can knock this one out. Thanks. 

 Time to cleanup intermediate shuffle files not included in shuffle write time
 -

 Key: SPARK-5845
 URL: https://issues.apache.org/jira/browse/SPARK-5845
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.3.0, 1.2.1
Reporter: Kay Ousterhout
Priority: Minor

 When the disk is contended, I've observed cases when it takes as long as 7 
 seconds to clean up all of the intermediate spill files for a shuffle (when 
 using the sort based shuffle, but bypassing merging because there are =200 
 shuffle partitions).  This is even when the shuffle data is non-huge (152MB 
 written from one of the tasks where I observed this).  This is effectively 
 part of the shuffle write time (because it's a necessary side effect of 
 writing data to disk) so should be added to the shuffle write time to 
 facilitate debugging.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5750) Document that ordering of elements in shuffled partitions is not deterministic across runs

2015-02-23 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14333685#comment-14333685
 ] 

Ilya Ganelin commented on SPARK-5750:
-

Hi Josh - I can knock this out. Thanks.

 Document that ordering of elements in shuffled partitions is not 
 deterministic across runs
 --

 Key: SPARK-5750
 URL: https://issues.apache.org/jira/browse/SPARK-5750
 Project: Spark
  Issue Type: Improvement
  Components: Documentation
Reporter: Josh Rosen

 The ordering of elements in shuffled partitions is not deterministic across 
 runs.  For instance, consider the following example:
 {code}
 val largeFiles = sc.textFile(...)
 val airlines = largeFiles.repartition(2000).cache()
 println(airlines.first)
 {code}
 If this code is run twice, then each run will output a different result.  
 There is non-determinism in the shuffle read code that accounts for this:
 Spark's shuffle read path processes blocks as soon as they are fetched  Spark 
 uses 
 [ShuffleBlockFetcherIterator|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala]
  to fetch shuffle data from mappers.  In this code, requests for multiple 
 blocks from the same host are batched together, so nondeterminism in where 
 tasks are run means that the set of requests can vary across runs.  In 
 addition, there's an [explicit 
 call|https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L256]
  to randomize the order of the batched fetch requests.  As a result, shuffle 
 operations cannot be guaranteed to produce the same ordering of the elements 
 in their partitions.
 Therefore, Spark should update its docs to clarify that the ordering of 
 elements in shuffle RDDs' partitions is non-deterministic.  Note, however, 
 that the _set_ of elements in each partition will be deterministic: if we 
 used {{mapPartitions}} to sort each partition, then the {{first()}} call 
 above would produce a deterministic result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5079) Detect failed jobs / batches in Spark Streaming unit tests

2015-02-23 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14333743#comment-14333743
 ] 

Ilya Ganelin commented on SPARK-5079:
-

Hi [~joshrosen] - I'm trying to wrap my head around the unit tests trying to 
find some specific tests where this is a problem as a baseline. If you could 
highlight a couple of examples as a starting point that would help a lot. 
Thanks!

 Detect failed jobs / batches in Spark Streaming unit tests
 --

 Key: SPARK-5079
 URL: https://issues.apache.org/jira/browse/SPARK-5079
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Reporter: Josh Rosen
Assignee: Ilya Ganelin

 Currently, it is possible to write Spark Streaming unit tests where Spark 
 jobs fail but the streaming tests succeed because we rely on wall-clock time 
 plus output comparision in order to check whether a test has passed, and 
 hence may miss cases where errors occurred if they didn't affect these 
 results.  We should strengthen the tests to check that no job failures 
 occurred while processing batches.
 See https://github.com/apache/spark/pull/3832#issuecomment-68580794 for 
 additional context.
 The StreamingTestWaiter in https://github.com/apache/spark/pull/3801 might 
 also fix this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior

2015-02-11 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317334#comment-14317334
 ] 

Ilya Ganelin commented on SPARK-4423:
-

Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue specifically - local 
execution on the driver (in ```local``` mode) versus the division of labor 
between the driver and the executors (in ```cluster``` mode). This is something 
that's a little un-intuitive and understanding it is vital to understanding 
Spark. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.



 Improve foreach() documentation to avoid confusion between local- and 
 cluster-mode behavior
 ---

 Key: SPARK-4423
 URL: https://issues.apache.org/jira/browse/SPARK-4423
 Project: Spark
  Issue Type: Improvement
  Components: Documentation
Reporter: Josh Rosen
Assignee: Ilya Ganelin

 {{foreach}} seems to be a common source of confusion for new users: in 
 {{local}} mode, {{foreach}} can be used to update local variables on the 
 driver, but programs that do this will not work properly when executed on 
 clusters, since the {{foreach}} will update per-executor variables (note that 
 this _will_ work correctly for accumulators, but not for other types of 
 mutable objects).
 Similarly, I've seen users become confused when {{.foreach(println)}} doesn't 
 print to the driver's standard output.
 At a minimum, we should improve the documentation to warn users against 
 unsafe uses of {{foreach}} that won't work properly when transitioning from 
 local mode to a real cluster.
 We might also consider changes to local mode so that its behavior more 
 closely matches the cluster modes; this will require some discussion, though, 
 since any change of behavior here would technically be a user-visible 
 backwards-incompatible change (I don't think that we made any explicit 
 guarantees about the current local-mode behavior, but someone might be 
 relying on the current implicit behavior).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior

2015-02-11 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317334#comment-14317334
 ] 

Ilya Ganelin edited comment on SPARK-4423 at 2/12/15 1:46 AM:
--

Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue - local execution on 
the driver (in {{local}} mode) versus the division of labor between the driver 
and the executors (in {{cluster}} mode). Specifically, I'd like to discuss 
where the actual data is that the executors are operating on. This also becomes 
useful during performance tuning - for example using mapPartitions to avoid 
shuffle operations, since it ties in with data aggregation for executors. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.




was (Author: ilganeli):
Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue specifically - local 
execution on the driver (in {{local}} mode) versus the division of labor 
between the driver and the executors (in {{cluster}} mode). Specifically, I'd 
like to discuss where the actual data is that the executors are operating on. 
This also becomes useful during performance tuning - for example using 
mapPartitions to avoid shuffle operations, since it ties in with data 
aggregation for executors. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.



 Improve foreach() documentation to avoid confusion between local- and 
 cluster-mode behavior
 ---

 Key: SPARK-4423
 URL: https://issues.apache.org/jira/browse/SPARK-4423
 Project: Spark
  Issue Type: Improvement
  Components: Documentation
Reporter: Josh Rosen
Assignee: Ilya Ganelin

 {{foreach}} seems to be a common source of confusion for new users: in 
 {{local}} mode, {{foreach}} can be used to update local variables on the 
 driver, but programs that do this will not work properly when executed on 
 clusters, since the {{foreach}} will update per-executor variables (note that 
 this _will_ work correctly for accumulators, but not for other types of 
 mutable objects).
 Similarly, I've seen users become confused when {{.foreach(println)}} doesn't 
 print to the driver's standard output.
 At a minimum, we should improve the documentation to warn users against 
 unsafe uses of {{foreach}} that won't work properly when transitioning from 
 local mode to a real cluster.
 We might also consider changes to local mode so that its behavior more 
 closely matches the cluster modes; this will require some discussion, though, 
 since any change of behavior here would technically be a user-visible 
 backwards-incompatible change (I don't think that we made any explicit 
 guarantees about the current local-mode behavior, but someone might be 
 relying on the current implicit behavior).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior

2015-02-11 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317334#comment-14317334
 ] 

Ilya Ganelin edited comment on SPARK-4423 at 2/12/15 1:46 AM:
--

Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue specifically - local 
execution on the driver (in {{local}} mode) versus the division of labor 
between the driver and the executors (in {{cluster}} mode). Specifically, I'd 
like to discuss where the actual data is that the executors are operating on. 
This also becomes useful during performance tuning - for example using 
mapPartitions to avoid shuffle operations, since it ties in with data 
aggregation for executors. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.




was (Author: ilganeli):
Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue specifically - local 
execution on the driver (in {{local}} mode) versus the division of labor 
between the driver and the executors (in {{cluster}} mode). This is something 
that's a little un-intuitive and understanding it is vital to understanding 
Spark. This also becomes useful during performance tuning (for example using 
mapPartitions to avoid shuffle operations). 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.



 Improve foreach() documentation to avoid confusion between local- and 
 cluster-mode behavior
 ---

 Key: SPARK-4423
 URL: https://issues.apache.org/jira/browse/SPARK-4423
 Project: Spark
  Issue Type: Improvement
  Components: Documentation
Reporter: Josh Rosen
Assignee: Ilya Ganelin

 {{foreach}} seems to be a common source of confusion for new users: in 
 {{local}} mode, {{foreach}} can be used to update local variables on the 
 driver, but programs that do this will not work properly when executed on 
 clusters, since the {{foreach}} will update per-executor variables (note that 
 this _will_ work correctly for accumulators, but not for other types of 
 mutable objects).
 Similarly, I've seen users become confused when {{.foreach(println)}} doesn't 
 print to the driver's standard output.
 At a minimum, we should improve the documentation to warn users against 
 unsafe uses of {{foreach}} that won't work properly when transitioning from 
 local mode to a real cluster.
 We might also consider changes to local mode so that its behavior more 
 closely matches the cluster modes; this will require some discussion, though, 
 since any change of behavior here would technically be a user-visible 
 backwards-incompatible change (I don't think that we made any explicit 
 guarantees about the current local-mode behavior, but someone might be 
 relying on the current implicit behavior).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior

2015-02-11 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317334#comment-14317334
 ] 

Ilya Ganelin edited comment on SPARK-4423 at 2/12/15 2:39 AM:
--

Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue - local execution on 
the driver (in {{local}} mode) versus the division of labor between the driver 
and the executors (in {{cluster}} mode). Specifically, I'd like to discuss 
where the actual data is that the executors are operating on. This also becomes 
useful during performance tuning - for example using mapPartitions to avoid 
shuffle operations, since it ties in with data aggregation for executors. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.


Edit:
Upon further consideration I've realized that the above doesn't quite address 
the spirit of the issue. I think what is really at play here is simply a need 
to explain closures in local vs. cluster modes.  



was (Author: ilganeli):
Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue - local execution on 
the driver (in {{local}} mode) versus the division of labor between the driver 
and the executors (in {{cluster}} mode). Specifically, I'd like to discuss 
where the actual data is that the executors are operating on. This also becomes 
useful during performance tuning - for example using mapPartitions to avoid 
shuffle operations, since it ties in with data aggregation for executors. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.



 Improve foreach() documentation to avoid confusion between local- and 
 cluster-mode behavior
 ---

 Key: SPARK-4423
 URL: https://issues.apache.org/jira/browse/SPARK-4423
 Project: Spark
  Issue Type: Improvement
  Components: Documentation
Reporter: Josh Rosen
Assignee: Ilya Ganelin

 {{foreach}} seems to be a common source of confusion for new users: in 
 {{local}} mode, {{foreach}} can be used to update local variables on the 
 driver, but programs that do this will not work properly when executed on 
 clusters, since the {{foreach}} will update per-executor variables (note that 
 this _will_ work correctly for accumulators, but not for other types of 
 mutable objects).
 Similarly, I've seen users become confused when {{.foreach(println)}} doesn't 
 print to the driver's standard output.
 At a minimum, we should improve the documentation to warn users against 
 unsafe uses of {{foreach}} that won't work properly when transitioning from 
 local mode to a real cluster.
 We might also consider changes to local mode so that its behavior more 
 closely matches the cluster modes; this will require some discussion, though, 
 since any change of behavior here would technically be a user-visible 
 backwards-incompatible change (I don't think that we made any explicit 
 guarantees about the current local-mode behavior, but someone might be 
 relying on the current implicit behavior).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior

2015-02-11 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317334#comment-14317334
 ] 

Ilya Ganelin edited comment on SPARK-4423 at 2/12/15 1:43 AM:
--

Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue specifically - local 
execution on the driver (in {{local}} mode) versus the division of labor 
between the driver and the executors (in {{cluster}} mode). This is something 
that's a little un-intuitive and understanding it is vital to understanding 
Spark. This also becomes useful during performance tuning (for example using 
mapPartitions to avoid shuffle operations). 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.




was (Author: ilganeli):
Hi [~pwendell] and [~joshrosen], how do you guys feel about my adding a section 
to the Spark Programming Guide that discusses this issue specifically - local 
execution on the driver (in ```local``` mode) versus the division of labor 
between the driver and the executors (in ```cluster``` mode). This is something 
that's a little un-intuitive and understanding it is vital to understanding 
Spark. 

This section could be referenced within the shorter description for foreach, 
map, mapPartitions, mapPartitionsWIthIndex, and flatMap or some other set of 
operators we care about.



 Improve foreach() documentation to avoid confusion between local- and 
 cluster-mode behavior
 ---

 Key: SPARK-4423
 URL: https://issues.apache.org/jira/browse/SPARK-4423
 Project: Spark
  Issue Type: Improvement
  Components: Documentation
Reporter: Josh Rosen
Assignee: Ilya Ganelin

 {{foreach}} seems to be a common source of confusion for new users: in 
 {{local}} mode, {{foreach}} can be used to update local variables on the 
 driver, but programs that do this will not work properly when executed on 
 clusters, since the {{foreach}} will update per-executor variables (note that 
 this _will_ work correctly for accumulators, but not for other types of 
 mutable objects).
 Similarly, I've seen users become confused when {{.foreach(println)}} doesn't 
 print to the driver's standard output.
 At a minimum, we should improve the documentation to warn users against 
 unsafe uses of {{foreach}} that won't work properly when transitioning from 
 local mode to a real cluster.
 We might also consider changes to local mode so that its behavior more 
 closely matches the cluster modes; this will require some discussion, though, 
 since any change of behavior here would technically be a user-visible 
 backwards-incompatible change (I don't think that we made any explicit 
 guarantees about the current local-mode behavior, but someone might be 
 relying on the current implicit behavior).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4655) Split Stage into ShuffleMapStage and ResultStage subclasses

2015-02-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14312458#comment-14312458
 ] 

Ilya Ganelin commented on SPARK-4655:
-

Hi [~joshrosen], I'd be happy to work on this. Thanks!

 Split Stage into ShuffleMapStage and ResultStage subclasses
 ---

 Key: SPARK-4655
 URL: https://issues.apache.org/jira/browse/SPARK-4655
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Reporter: Josh Rosen
Assignee: Josh Rosen

 The scheduler's {{Stage}} class has many fields which are only applicable to 
 result stages or shuffle map stages.  As a result, I think that it makes 
 sense to make {{Stage}} into an abstract base class with two subclasses, 
 {{ResultStage}} and {{ShuffleMapStage}}.  This would improve the 
 understandability of the DAGScheduler code. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4423) Improve foreach() documentation to avoid confusion between local- and cluster-mode behavior

2015-02-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14312424#comment-14312424
 ] 

Ilya Ganelin commented on SPARK-4423:
-

I'll be happy to update this. Thank you.

 Improve foreach() documentation to avoid confusion between local- and 
 cluster-mode behavior
 ---

 Key: SPARK-4423
 URL: https://issues.apache.org/jira/browse/SPARK-4423
 Project: Spark
  Issue Type: Improvement
  Components: Documentation
Reporter: Josh Rosen

 {{foreach}} seems to be a common source of confusion for new users: in 
 {{local}} mode, {{foreach}} can be used to update local variables on the 
 driver, but programs that do this will not work properly when executed on 
 clusters, since the {{foreach}} will update per-executor variables (note that 
 this _will_ work correctly for accumulators, but not for other types of 
 mutable objects).
 Similarly, I've seen users become confused when {{.foreach(println)}} doesn't 
 print to the driver's standard output.
 At a minimum, we should improve the documentation to warn users against 
 unsafe uses of {{foreach}} that won't work properly when transitioning from 
 local mode to a real cluster.
 We might also consider changes to local mode so that its behavior more 
 closely matches the cluster modes; this will require some discussion, though, 
 since any change of behavior here would technically be a user-visible 
 backwards-incompatible change (I don't think that we made any explicit 
 guarantees about the current local-mode behavior, but someone might be 
 relying on the current implicit behavior).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5570) No docs stating that `new SparkConf().set(spark.driver.memory, ...) will not work

2015-02-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14312416#comment-14312416
 ] 

Ilya Ganelin commented on SPARK-5570:
-

I'll fix this, can you please assign it to me? Thanks.

 No docs stating that `new SparkConf().set(spark.driver.memory, ...) will 
 not work
 ---

 Key: SPARK-5570
 URL: https://issues.apache.org/jira/browse/SPARK-5570
 Project: Spark
  Issue Type: Bug
  Components: Documentation, Spark Core
Affects Versions: 1.2.0
Reporter: Tathagata Das
Assignee: Andrew Or





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5079) Detect failed jobs / batches in Spark Streaming unit tests

2015-02-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14312415#comment-14312415
 ] 

Ilya Ganelin commented on SPARK-5079:
-

I can work on this - can you please assign it to me? Thank you. 

 Detect failed jobs / batches in Spark Streaming unit tests
 --

 Key: SPARK-5079
 URL: https://issues.apache.org/jira/browse/SPARK-5079
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Reporter: Josh Rosen

 Currently, it is possible to write Spark Streaming unit tests where Spark 
 jobs fail but the streaming tests succeed because we rely on wall-clock time 
 plus output comparision in order to check whether a test has passed, and 
 hence may miss cases where errors occurred if they didn't affect these 
 results.  We should strengthen the tests to check that no job failures 
 occurred while processing batches.
 See https://github.com/apache/spark/pull/3832#issuecomment-68580794 for 
 additional context.
 The StreamingTestWaiter in https://github.com/apache/spark/pull/3801 might 
 also fix this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-5570) No docs stating that `new SparkConf().set(spark.driver.memory, ...) will not work

2015-02-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14312416#comment-14312416
 ] 

Ilya Ganelin edited comment on SPARK-5570 at 2/9/15 4:27 PM:
-

I would be happy to fix this. Thank you. 


was (Author: ilganeli):
I'll fix this, can you please assign it to me? Thanks.

 No docs stating that `new SparkConf().set(spark.driver.memory, ...) will 
 not work
 ---

 Key: SPARK-5570
 URL: https://issues.apache.org/jira/browse/SPARK-5570
 Project: Spark
  Issue Type: Bug
  Components: Documentation, Spark Core
Affects Versions: 1.2.0
Reporter: Tathagata Das
Assignee: Andrew Or





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-823) spark.default.parallelism's default is inconsistent across scheduler backends

2015-02-09 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14312382#comment-14312382
 ] 

Ilya Ganelin commented on SPARK-823:


Hi [~joshrosen] I believe the documentation is up to date and I reviewed all 
usages of spark.default.parallelism and found no inconsistencies with the 
documentation. The only thing that is un-documented with regards to the usage 
of spark.default.parallelism is how it's used within the Partitioner class in 
both Spark and Python. If defined, the default number of partitions created is 
equal to spark.default.parallelism - otherwise, it's the local number of 
partitions. I think this issue can be closed - I don't think that particular 
case needs to be publicly documented (it's clearly evident in the code what is 
going on). 

 spark.default.parallelism's default is inconsistent across scheduler backends
 -

 Key: SPARK-823
 URL: https://issues.apache.org/jira/browse/SPARK-823
 Project: Spark
  Issue Type: Bug
  Components: Documentation, PySpark, Scheduler
Affects Versions: 0.8.0, 0.7.3, 0.9.1
Reporter: Josh Rosen
Priority: Minor

 The [0.7.3 configuration 
 guide|http://spark-project.org/docs/latest/configuration.html] says that 
 {{spark.default.parallelism}}'s default is 8, but the default is actually 
 max(totalCoreCount, 2) for the standalone scheduler backend, 8 for the Mesos 
 scheduler, and {{threads}} for the local scheduler:
 https://github.com/mesos/spark/blob/v0.7.3/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala#L157
 https://github.com/mesos/spark/blob/v0.7.3/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala#L317
 https://github.com/mesos/spark/blob/v0.7.3/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala#L150
 Should this be clarified in the documentation?  Should the Mesos scheduler 
 backend's default be revised?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-2584) Do not mutate block storage level on the UI

2015-01-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14273877#comment-14273877
 ] 

Ilya Ganelin edited comment on SPARK-2584 at 1/12/15 7:08 PM:
--

Understood, I am able to recreate this issue in 1.1. I'll work on a fix to 
clarify what's going on. Thank.



was (Author: ilganeli):
Understood, I was looking at the UI for Spark 1.1 and did not see the block 
storage level represented as MEMORY_AND_DISK or DISK_ONLY. It's now presented 
as Memory Deserialized or Disk Deserialized. I'll attempt to recreate this 
problem in the newer version of Spark but wanted to know if you've seen it 
since 1.0.1. 

 Do not mutate block storage level on the UI
 ---

 Key: SPARK-2584
 URL: https://issues.apache.org/jira/browse/SPARK-2584
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, Web UI
Affects Versions: 1.0.1
Reporter: Andrew Or

 If a block is stored MEMORY_AND_DISK and we drop it from memory, it becomes 
 DISK_ONLY on the UI. We should preserve the original storage level  proposed 
 by the user, in addition to the change in actual storage level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2584) Do not mutate block storage level on the UI

2015-01-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14273877#comment-14273877
 ] 

Ilya Ganelin commented on SPARK-2584:
-

Understood, I was looking at the UI for Spark 1.1 and did not see the block 
storage level represented as MEMORY_AND_DISK or DISK_ONLY. It's now presented 
as Memory Deserialized or Disk Deserialized. I'll attempt to recreate this 
problem in the newer version of Spark but wanted to know if you've seen it 
since 1.0.1. 

 Do not mutate block storage level on the UI
 ---

 Key: SPARK-2584
 URL: https://issues.apache.org/jira/browse/SPARK-2584
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, Web UI
Affects Versions: 1.0.1
Reporter: Andrew Or

 If a block is stored MEMORY_AND_DISK and we drop it from memory, it becomes 
 DISK_ONLY on the UI. We should preserve the original storage level  proposed 
 by the user, in addition to the change in actual storage level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2584) Do not mutate block storage level on the UI

2015-01-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14273762#comment-14273762
 ] 

Ilya Ganelin commented on SPARK-2584:
-

Hi Andrew, question about this. When you say we drop it from memory what 
mechanism are you talking about? It's illegal to change the persistence level 
of an already persisted RDD and if you call unpersist() it's dropped from both 
memory and disk storage. How would an RDD be dropped from memory? 

 Do not mutate block storage level on the UI
 ---

 Key: SPARK-2584
 URL: https://issues.apache.org/jira/browse/SPARK-2584
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, Web UI
Affects Versions: 1.0.1
Reporter: Andrew Or

 If a block is stored MEMORY_AND_DISK and we drop it from memory, it becomes 
 DISK_ONLY on the UI. We should preserve the original storage level  proposed 
 by the user, in addition to the change in actual storage level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-2584) Do not mutate block storage level on the UI

2015-01-12 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14273762#comment-14273762
 ] 

Ilya Ganelin edited comment on SPARK-2584 at 1/12/15 4:47 PM:
--

Hi Andrew, question about this. When you say we drop it from memory what 
mechanism are you talking about? It's illegal to change the persistence level 
of an already persisted RDD and if you call unpersist() it's dropped from both 
memory and disk storage. How would an RDD be dropped from memory? I'm just 
trying to reproduce the issue before creating a fix. 


was (Author: ilganeli):
Hi Andrew, question about this. When you say we drop it from memory what 
mechanism are you talking about? It's illegal to change the persistence level 
of an already persisted RDD and if you call unpersist() it's dropped from both 
memory and disk storage. How would an RDD be dropped from memory? 

 Do not mutate block storage level on the UI
 ---

 Key: SPARK-2584
 URL: https://issues.apache.org/jira/browse/SPARK-2584
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, Web UI
Affects Versions: 1.0.1
Reporter: Andrew Or

 If a block is stored MEMORY_AND_DISK and we drop it from memory, it becomes 
 DISK_ONLY on the UI. We should preserve the original storage level  proposed 
 by the user, in addition to the change in actual storage level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3885) Provide mechanism to remove accumulators once they are no longer used

2015-01-07 Thread Ilya Ganelin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14268079#comment-14268079
 ] 

Ilya Ganelin commented on SPARK-3885:
-

Hi [~joshrosen], I can knock this one out - could you please assign it to me. 
One minor question, both here and in the code there is a TODO recommending 
using soft references. However, a soft reference will not be released by 
default, only when the garbage collector explicitly needs more memory. Is there 
any reason it can't be made a weak reference instead?

 Provide mechanism to remove accumulators once they are no longer used
 -

 Key: SPARK-3885
 URL: https://issues.apache.org/jira/browse/SPARK-3885
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.2, 1.1.0, 1.2.0
Reporter: Josh Rosen

 Spark does not currently provide any mechanism to delete accumulators after 
 they are no longer used.  This can lead to OOMs for long-lived SparkContexts 
 that create many large accumulators.
 Part of the problem is that accumulators are registered in a global 
 {{Accumulators}} registry.  Maybe the fix would be as simple as using weak 
 references in the Accumulators registry so that accumulators can be GC'd once 
 they can no longer be used.
 In the meantime, here's a workaround that users can try:
 Accumulators have a public setValue() method that can be called (only by the 
 driver) to change an accumulator’s value.  You might be able to use this to 
 reset accumulators’ values to smaller objects (e.g. the “zero” object of 
 whatever your accumulator type is, or ‘null’ if you’re sure that the 
 accumulator will never be accessed again).
 This issue was originally reported by [~nkronenfeld] on the dev mailing list: 
 http://apache-spark-developers-list.1001551.n3.nabble.com/Fwd-Accumulator-question-td8709.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >