[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-07 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-51439437
  
QA tests have started for PR 1507. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18113/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-07 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-51442968
  
QA results for PR 1507:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18113/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-07 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-51497079
  
It's failing the 3 flaky tests that have been failing many PRs lately... 
test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-07 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-51497616
  
QA tests have started for PR 1507. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18126/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-07 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15972438
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -81,11 +83,26 @@ class TaskMetrics extends Serializable {
   var inputMetrics: Option[InputMetrics] = None
 
   /**
-   * If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here
+   * If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here.
+   * This includes read metrics aggregated over all the task's shuffle 
dependencies.
*/
   private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
 
-  def shuffleReadMetrics = _shuffleReadMetrics
+  def shuffleReadMetrics() = _shuffleReadMetrics
--- End diff --

nit: since this doesn't mutate internal state the original lack of 
parentheses is correct style.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-07 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-51552698
  
Okay I merged this with the minor style change. Thanks Sandy!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1507


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-06 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15900998
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
@@ -191,7 +184,7 @@ object BlockFetcherIterator {
 }
   }
   logInfo(Getting  + _numBlocksToFetch +  non-empty blocks out of  
+
-(numLocal + numRemote) +  blocks)
+totalBlocks +  blocks)
--- End diff --

Naw


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-51395051
  
QA tests have started for PR 1507. This patch DID NOT merge cleanly! 
brView progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18047/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-06 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15903044
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -98,19 +105,31 @@ class TaskMetrics extends Serializable {
*/
   var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
 
-  /** Adds the given ShuffleReadMetrics to any existing shuffle metrics 
for this task. */
-  def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = 
synchronized {
-_shuffleReadMetrics match {
-  case Some(existingMetrics) =
-existingMetrics.shuffleFinishTime = math.max(
-  existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
-existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
-existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
-existingMetrics.remoteBlocksFetched += 
newMetrics.remoteBlocksFetched
-existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
-  case None =
-_shuffleReadMetrics = Some(newMetrics)
+  /**
+   * A task may have multiple shuffle readers for multiple dependencies. 
To avoid synchronization
+   * issues from readers in different threads, in-progress tasks use a 
ShuffleReadMetrics for each
+   * dependency, and merge these metrics before reporting them to the 
driver. This method returns
+   * a ShuffleReadMetrics for a dependency and registers it for merging 
later.
+   */
+  def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = 
synchronized {
--- End diff --

can this be private[spark]?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-06 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15903066
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -98,19 +105,31 @@ class TaskMetrics extends Serializable {
*/
   var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
 
-  /** Adds the given ShuffleReadMetrics to any existing shuffle metrics 
for this task. */
-  def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = 
synchronized {
-_shuffleReadMetrics match {
-  case Some(existingMetrics) =
-existingMetrics.shuffleFinishTime = math.max(
-  existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
-existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
-existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
-existingMetrics.remoteBlocksFetched += 
newMetrics.remoteBlocksFetched
-existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
-  case None =
-_shuffleReadMetrics = Some(newMetrics)
+  /**
+   * A task may have multiple shuffle readers for multiple dependencies. 
To avoid synchronization
+   * issues from readers in different threads, in-progress tasks use a 
ShuffleReadMetrics for each
+   * dependency, and merge these metrics before reporting them to the 
driver. This method returns
+   * a ShuffleReadMetrics for a dependency and registers it for merging 
later.
+   */
+  def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = 
synchronized {
+val readMetrics = new ShuffleReadMetrics()
+depsShuffleReadMetrics += readMetrics
+readMetrics
+  }
+
+  /**
+   * Aggregates shuffle read metrics for all registered dependencies into 
shuffleReadMetrics.
+   */
+  def mergeShuffleReadMetrics() = synchronized {
--- End diff --

can this be private[spark]?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-51396408
  
QA tests have started for PR 1507. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18049/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-06 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15905625
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -73,11 +75,16 @@ class TaskMetrics extends Serializable {
   var inputMetrics: Option[InputMetrics] = None
 
   /**
-   * If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here
+   * If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here.
+   * This includes read metrics aggregated over all the task's shuffle 
dependencies.
*/
-  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
--- End diff --

can we add a `private[spark]` setter for it and explain it should only be 
used when recreating objects from JSON? I find it weird to expose this as a var 
since now users should not ever modify this directly - if someone looked at 
this class that would be non obvious. So I'd say we keep the var private and we 
add a private[spark] setter and say in the doc it should only be used during 
JSON deserialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-06 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15905777
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -73,11 +75,16 @@ class TaskMetrics extends Serializable {
   var inputMetrics: Option[InputMetrics] = None
 
   /**
-   * If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here
+   * If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here.
+   * This includes read metrics aggregated over all the task's shuffle 
dependencies.
*/
-  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
--- End diff --

This plan seems good to me (sorry for the slow response here!)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-06 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15906274
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -73,11 +75,16 @@ class TaskMetrics extends Serializable {
   var inputMetrics: Option[InputMetrics] = None
 
   /**
-   * If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here
+   * If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here.
+   * This includes read metrics aggregated over all the task's shuffle 
dependencies.
*/
-  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
--- End diff --

Does the same logic not apply for all the other fields in TaskMetrics that 
are vars?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-51403107
  
QA results for PR 1507:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18049/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-51404085
  
QA results for PR 1507:br- This patch PASSES unit tests.brbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18047/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-06 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15906866
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -98,19 +105,31 @@ class TaskMetrics extends Serializable {
*/
   var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
 
-  /** Adds the given ShuffleReadMetrics to any existing shuffle metrics 
for this task. */
-  def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = 
synchronized {
-_shuffleReadMetrics match {
-  case Some(existingMetrics) =
-existingMetrics.shuffleFinishTime = math.max(
-  existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
-existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
-existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
-existingMetrics.remoteBlocksFetched += 
newMetrics.remoteBlocksFetched
-existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
-  case None =
-_shuffleReadMetrics = Some(newMetrics)
+  /**
+   * A task may have multiple shuffle readers for multiple dependencies. 
To avoid synchronization
+   * issues from readers in different threads, in-progress tasks use a 
ShuffleReadMetrics for each
+   * dependency, and merge these metrics before reporting them to the 
driver. This method returns
+   * a ShuffleReadMetrics for a dependency and registers it for merging 
later.
+   */
+  def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = 
synchronized {
+val readMetrics = new ShuffleReadMetrics()
+depsShuffleReadMetrics += readMetrics
+readMetrics
+  }
+
+  /**
+   * Aggregates shuffle read metrics for all registered dependencies into 
shuffleReadMetrics.
+   */
+  def mergeShuffleReadMetrics() = synchronized {
--- End diff --

Also I think this should be called `updateShuffleReadMetrics` or something 
since it mutates this object in place. We have functions named `mergeX` in 
several other places in the spark code e.g. `mergeCombiners`, they all return 
no objects rather than mutating an existing one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-06 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15907133
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -73,11 +75,16 @@ class TaskMetrics extends Serializable {
   var inputMetrics: Option[InputMetrics] = None
 
   /**
-   * If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here
+   * If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here.
+   * This includes read metrics aggregated over all the task's shuffle 
dependencies.
*/
-  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
--- End diff --

by users I mean internal spark code that instruments this. I think it is 
counter intuitive to make something a var (and to expose it as a var publicly) 
but actually there is no intended use where someone modifies this var. There is 
a corner case here where we deserialize an object form JSON - so I'd propose we 
create a very narrow interface to deal with that case - a setter with clear 
documentation. I really think object deserialization is a special case here... 
if something is mutable for the purpose of deserializing - IMO - that's quite 
different than making it mutable outright for normal program execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-05 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15796125
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -98,19 +105,22 @@ class TaskMetrics extends Serializable {
*/
   var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
 
-  /** Adds the given ShuffleReadMetrics to any existing shuffle metrics 
for this task. */
-  def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = 
synchronized {
-_shuffleReadMetrics match {
-  case Some(existingMetrics) =
-existingMetrics.shuffleFinishTime = math.max(
-  existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
-existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
-existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
-existingMetrics.remoteBlocksFetched += 
newMetrics.remoteBlocksFetched
-existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
-  case None =
-_shuffleReadMetrics = Some(newMetrics)
+  def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = 
synchronized {
+val readMetrics = new ShuffleReadMetrics()
+depsShuffleReadMetrics += readMetrics
+readMetrics
+  }
+
+  def mergeShuffleReadMetrics() = synchronized {
--- End diff --

Could you add a brief comment on what this does? (and when this happens)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-05 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15796149
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
@@ -131,7 +122,9 @@ object BlockFetcherIterator {
 val networkSize = blockMessage.getData.limit()
 results.put(new FetchResult(blockId, sizeMap(blockId),
   () = dataDeserialize(blockId, blockMessage.getData, 
serializer)))
-_remoteBytesRead += networkSize
+// TODO: race conditions can occur here with 
NettyBlockFetcherIterator
--- End diff --

Also this comment is pretty vague. It would be good if you could elaborate 
on it (what you described in the JIRA itself is good enough)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-04 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15796012
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
@@ -131,7 +122,9 @@ object BlockFetcherIterator {
 val networkSize = blockMessage.getData.limit()
 results.put(new FetchResult(blockId, sizeMap(blockId),
   () = dataDeserialize(blockId, blockMessage.getData, 
serializer)))
-_remoteBytesRead += networkSize
+// TODO: race conditions can occur here with 
NettyBlockFetcherIterator
--- End diff --

Could you add a reference to the JIRA to the comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-04 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15796079
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
@@ -191,7 +184,7 @@ object BlockFetcherIterator {
 }
   }
   logInfo(Getting  + _numBlocksToFetch +  non-empty blocks out of  
+
-(numLocal + numRemote) +  blocks)
+totalBlocks +  blocks)
--- End diff --

Is this ever used other than for logging?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-02 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-50976412
  
Just tested this and observed the shuffle bytes read going up for 
in-progress tasks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-08-01 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15716773
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -73,11 +75,16 @@ class TaskMetrics extends Serializable {
   var inputMetrics: Option[InputMetrics] = None
 
   /**
-   * If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here
+   * If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here.
+   * This includes read metrics aggregated over all the task's shuffle 
dependencies.
*/
-  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
--- End diff --

Making depsShuffleReadMetrics private.  shuffleReadMetrics still needs to 
be set inside of JsonProtocol, so making it private would either require 
updating it through createShuffleReadMetricsForDependency or adding a setter, 
both of which seem a little weird to me.  Any thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15238649
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -90,19 +94,18 @@ class TaskMetrics extends Serializable {
*/
   var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
 
-  /** Adds the given ShuffleReadMetrics to any existing shuffle metrics 
for this task. */
-  def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = 
synchronized {
-_shuffleReadMetrics match {
-  case Some(existingMetrics) =
-existingMetrics.shuffleFinishTime = math.max(
-  existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
-existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
-existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
-existingMetrics.remoteBlocksFetched += 
newMetrics.remoteBlocksFetched
-existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
-  case None =
-_shuffleReadMetrics = Some(newMetrics)
+  def mergeShuffleReadMetrics() {
+val merged = new ShuffleReadMetrics()
+synchronized {
+  for (depMetrics - depsShuffleReadMetrics) {
+merged.fetchWaitTime += depMetrics.fetchWaitTime
+merged.localBlocksFetched += depMetrics.localBlocksFetched
+merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched
+merged.remoteBytesRead += depMetrics.remoteBytesRead
+merged.shuffleFinishTime = math.max(merged.shuffleFinishTime, 
depMetrics.shuffleFinishTime)
+  }
 }
+shuffleReadMetrics = Some(merged)
--- End diff --

Why is this outside the synchronized block?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15238692
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala ---
@@ -20,6 +20,8 @@ package org.apache.spark.shuffle.hash
 import org.apache.spark.{InterruptibleIterator, TaskContext}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}
+import org.apache.spark.executor.ShuffleReadMetrics
+import scala.collection.mutable.ArrayBuffer
--- End diff --

import ordering


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15238657
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -19,6 +19,7 @@ package org.apache.spark.executor
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.storage.{BlockId, BlockStatus}
+import scala.collection.mutable.ArrayBuffer
--- End diff --

nit: Import ordering


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15238878
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
@@ -31,6 +31,7 @@ import org.apache.spark.network.ConnectionManagerId
 import org.apache.spark.network.netty.ShuffleCopier
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util.Utils
+import org.apache.spark.executor.ShuffleReadMetrics
--- End diff --

import ordering :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15238860
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala ---
@@ -35,8 +37,15 @@ class HashShuffleReader[K, C](
 
   /** Read the combined key-values for this reduce task */
   override def read(): Iterator[Product2[K, C]] = {
+val readMetrics = new ShuffleReadMetrics()
+context.taskMetrics.synchronized {
+  if (context.taskMetrics.depsShuffleReadMetrics == null) {
+context.taskMetrics.depsShuffleReadMetrics = new 
ArrayBuffer[ShuffleReadMetrics]()
+  }
+  context.taskMetrics.depsShuffleReadMetrics += readMetrics
+}
--- End diff --

Can we make getNewShuffleReadMetrics() or something a function in 
TaskMetrics that does this?  I think it would be better to have TaskMetrics 
handle the synchronization, because it makes it more obvious to someone reading 
/ modifying the code later what the concurrency properties of 
depsShuffleReadMetrics are.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15238942
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
@@ -131,7 +122,9 @@ object BlockFetcherIterator {
 val networkSize = blockMessage.getData.limit()
 results.put(new FetchResult(blockId, sizeMap(blockId),
   () = dataDeserialize(blockId, blockMessage.getData, 
serializer)))
-_remoteBytesRead += networkSize
+// TODO: race conditions can occur here with 
NettyBlockFetcherIterator
--- End diff --

Might be good to file a JIRA for metrics for the NettyBlockFetcherIterator 
more generally -- I noticed yesterday that the Netty version also doesn't 
report fetchWaitTime


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15242942
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
@@ -191,7 +183,7 @@ object BlockFetcherIterator {
 }
   }
   logInfo(Getting  + _numBlocksToFetch +  non-empty blocks out of  
+
-(numLocal + numRemote) +  blocks)
+(_numBlocksToFetch + localBlocksToFetch.size) +  blocks)
--- End diff --

Doesn't _numBlocksToFetch already include the local blocks?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15244576
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -75,9 +76,12 @@ class TaskMetrics extends Serializable {
   /**
* If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here
*/
-  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
--- End diff --

Maybe aggregatedShuffleReadMetrics?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-49776893
  
At a high level, this depends on one of your other patches (#1056?) to 
incrementally send updates right?  Is the idea that mergeShuffleReadMetrics 
will get called incrementally as the task runs, before sending partial results 
back to the driver?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-49778015
  
Exactly.  The idea is to call mergeShuffleReadMetrics when we're about to 
send the metrics update. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15246387
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
@@ -191,7 +183,7 @@ object BlockFetcherIterator {
 }
   }
   logInfo(Getting  + _numBlocksToFetch +  non-empty blocks out of  
+
-(numLocal + numRemote) +  blocks)
+(_numBlocksToFetch + localBlocksToFetch.size) +  blocks)
--- End diff --

Ooh yup


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15246688
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -75,9 +76,12 @@ class TaskMetrics extends Serializable {
   /**
* If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here
*/
-  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
--- End diff --

I agree it would be a little clearer, but I'm worried that it's too verbose 
and is going to make a bunch of lines spill over 100 characters.  It's mainly 
read in the driver, which doesn't really care that it's aggregated.

Would a comment be sufficient? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15246919
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
@@ -131,7 +122,9 @@ object BlockFetcherIterator {
 val networkSize = blockMessage.getData.limit()
 results.put(new FetchResult(blockId, sizeMap(blockId),
   () = dataDeserialize(blockId, blockMessage.getData, 
serializer)))
-_remoteBytesRead += networkSize
+// TODO: race conditions can occur here with 
NettyBlockFetcherIterator
--- End diff --

Filed https://issues.apache.org/jira/browse/SPARK-2625

In case it wasn't obvious, the race conditions aren't caused by this patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15247072
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
@@ -131,7 +122,9 @@ object BlockFetcherIterator {
 val networkSize = blockMessage.getData.limit()
 results.put(new FetchResult(blockId, sizeMap(blockId),
   () = dataDeserialize(blockId, blockMessage.getData, 
serializer)))
-_remoteBytesRead += networkSize
+// TODO: race conditions can occur here with 
NettyBlockFetcherIterator
--- End diff --

Cool thanks for filing that!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-49782493
  
QA tests have started for PR 1507. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16978/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15250406
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
@@ -154,14 +147,15 @@ object BlockFetcherIterator {
   // Split local and remote blocks. Remote blocks are further split 
into FetchRequests of size
   // at most maxBytesInFlight in order to limit the amount of data in 
flight.
   val remoteRequests = new ArrayBuffer[FetchRequest]
+  var totalBlocks = 0
   for ((address, blockInfos) - blocksByAddress) {
+totalBlocks += blockInfos.size
 if (address == blockManagerId) {
-  numLocal = blockInfos.size
+  readMetrics.localBlocksFetched += blockInfos.size
--- End diff --

Maybe we don't care about this...but this results in the metrics reporting 
that local blocks have been fetched before they're actually read from disk.  Is 
it too annoying to move this to where the blocks actually get read?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15250426
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -29,7 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props}
 import sun.nio.ch.DirectBuffer
 
 import org.apache.spark._
-import org.apache.spark.executor.{DataReadMethod, InputMetrics}
+import org.apache.spark.executor.{ShuffleReadMetrics, DataReadMethod, 
InputMetrics}
--- End diff --

one more alphabetization


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1507#discussion_r15250466
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -73,11 +75,16 @@ class TaskMetrics extends Serializable {
   var inputMetrics: Option[InputMetrics] = None
 
   /**
-   * If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here
+   * If this task reads from shuffle output, metrics on getting shuffle 
data will be collected here.
+   * This includes read metrics aggregated over all the task's shuffle 
dependencies.
*/
-  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
+  var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
--- End diff --

Can we make this and depsShuffleReadMetrics private again, since they 
should only be updated through createShuffleReadMetricsForDependency() and 
mergeShuffleReadMetrics()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-49789216
  
Thanks Sandy!! Just a few more small things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-49794831
  
QA results for PR 1507:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16978/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-49814031
  
QA tests have started for PR 1507. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16995/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-49820737
  
QA results for PR 1507:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16995/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-49581839
  
QA results for PR 1507:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16901/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-21 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-49690248
  
@kayousterhout


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-21 Thread sryza
GitHub user sryza opened a pull request:

https://github.com/apache/spark/pull/1507

SPARK-2565. Update ShuffleReadMetrics as blocks are fetched



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sryza/spark sandy-spark-2565

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1507.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1507


commit 75730ddbb025f837bb635a345b1ecde15e0bb8e7
Author: Sandy Ryza sa...@cloudera.com
Date:   2014-07-18T00:17:19Z

SPARK-2565. Update ShuffleReadMetrics as blocks are fetched




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2565. Update ShuffleReadMetrics as block...

2014-07-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1507#issuecomment-49575625
  
QA tests have started for PR 1507. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16901/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---