[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-12 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-55472830
  
What happens when there is recomputation which results in same blockId 
getting regenerated (unpersist followed by recomputation/persist or block drop 
followed by recomputation or something else ) ? It will now go to some random 
node potentially not same as previously selected ? Resulting in 
over-replication ?

A more corner case is if the computation was not idempotent ... and 
resulted in a changed dataset for the block - earlier it will get overwritten 
as part of replication : will we will now have two nodes with same data and a 
third (initially replicated to) which can diverge ?

Btw, from what I saw, node loss is not handled right ? So a block can get 
under replicated ? Would be nice if we added that in some day ...


Streaming is not the only application for replication :-) We use it in 
conjunction with locality wait levels to speed up computation when speculative 
execution is enabled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56293724
  
@tdas handling (1) deterministically will make (2) in line with what we 
currently have.
And that should be sufficient imo.

(3) was not in context of this patch - but a general shortcoming of spark 
currently.
Alleviating (3) might be complicated (not sure how much so) - but will have 
some very interesting consequences to performance (among others).

For example: this prevents us from using block persistance for checkpoint - 
there was a discussion about this in a JIRA a while back (forgot id) ... 
resolving this and with 3x replicated blocks, will mean we get really cheap and 
very performent checkpoint (while having fault tolerance at par with hdfs)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-21 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17833363
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime  
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug(Fetched peers from master:  + cachedPeers.mkString([, 
,, ]))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt(spark.storage.maxReplicationFailures, 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
--- End diff --

This would mean that, assuming there is no change to number of executors in 
system, we will consistently get the same peer back.
But even if some unrelated peer was added or removed, we wont get back the 
same peer we cached to last time.

Did I read the correctly, or am I missing something ? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17833383
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime  
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug(Fetched peers from master:  + cachedPeers.mkString([, 
,, ]))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt(spark.storage.maxReplicationFailures, 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
 }
-for (peer: BlockManagerId - cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(sTry to replicate $blockId once; The size of the data is 
${data.limit()} Bytes.  +
-sTo node: $peer)
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =
-  logError(sFailed to replicate block to $peer, e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+while (!done) {
+  getRandomPeer() match {
+case Some(peer) =
+  try {
+val onePeerStartTime = System.nanoTime
+data.rewind()
+logTrace(sTrying to replicate $blockId of ${data.limit()} 
bytes to $peer)
+blockTransferService.uploadBlockSync(
+  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
+logTrace(sReplicated $blockId of ${data.limit()} bytes to 
$peer in %f ms
+  .format((System.nanoTime - onePeerStartTime) / 1e6))
+peersReplicatedTo += peer
+forceFetchPeers = false
+if (peersReplicatedTo.size == numPeersToReplicateTo) {
+  done = true
+}
+  } catch {
+case e: Exception =
+  logWarning(sFailed to replicate $blockId to $peer, failure 
#$failures, e)
+  failures += 1
+  forceFetchPeers = true
+  peersFailedToReplicateTo += peer
--- End diff --

Ideally, we might want to cache this peersFailedToReplicateTo across block 
updates for a short ttl (to temporarily blacklist replication to peer).
But that can be done in a future PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17833419
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime  
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug(Fetched peers from master:  + cachedPeers.mkString([, 
,, ]))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt(spark.storage.maxReplicationFailures, 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
 }
-for (peer: BlockManagerId - cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(sTry to replicate $blockId once; The size of the data is 
${data.limit()} Bytes.  +
-sTo node: $peer)
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =
-  logError(sFailed to replicate block to $peer, e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+while (!done) {
+  getRandomPeer() match {
+case Some(peer) =
+  try {
+val onePeerStartTime = System.nanoTime
+data.rewind()
+logTrace(sTrying to replicate $blockId of ${data.limit()} 
bytes to $peer)
+blockTransferService.uploadBlockSync(
+  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
+logTrace(sReplicated $blockId of ${data.limit()} bytes to 
$peer in %f ms
+  .format((System.nanoTime - onePeerStartTime) / 1e6))
+peersReplicatedTo += peer
+forceFetchPeers = false
+if (peersReplicatedTo.size == numPeersToReplicateTo) {
+  done = true
+}
+  } catch {
+case e: Exception =
+  logWarning(sFailed to replicate $blockId to $peer, failure 
#$failures, e)
+  failures += 1
+  forceFetchPeers = true
+  peersFailedToReplicateTo += peer
--- End diff --

Btw, curious - will replication fail only when remote peer is dead ? (and 
so requiring forceFetchPeers)
What about inability to add block in remote peer ? Will that cause an 
exception to be raised here ?

Eseentially I am trying to understand if Exception raised here always means 
remote peer is 'dead'.
Alternative might be to list peers which have atleast 
data.rewrind().remaining() space available : but we dont support that iirc (and 
it can get used up before we make this call anyway I guess).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so

[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17833483
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +789,88 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
+val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 
1000) // milliseconds
+val timeout = System.currentTimeMillis - lastPeerFetchTime  
cachedPeersTtl
+
+cachedPeers.synchronized {
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug(Fetched peers from master:  + cachedPeers.mkString([, 
,, ]))
+  }
+}
+cachedPeers
+  }
+
+  /**
* Replicate block to another node.
*/
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel): Unit = {
+val maxReplicationFailures = 
conf.getInt(spark.storage.maxReplicationFailures, 1)
+val numPeersToReplicateTo = level.replication - 1
+val peersReplicatedTo = new HashSet[BlockManagerId]
+val peersFailedToReplicateTo = new HashSet[BlockManagerId]
 val tLevel = StorageLevel(
   level.useDisk, level.useMemory, level.useOffHeap, 
level.deserialized, 1)
-if (cachedPeers == null) {
-  cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+val startTime = System.nanoTime
+val random = new Random(blockId.hashCode)
+
+var forceFetchPeers = false
+var failures = 0
+var done = false
+
+// Get a random peer
+def getRandomPeer(): Option[BlockManagerId] = {
+  val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- 
peersFailedToReplicateTo
+  if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) 
else None
 }
-for (peer: BlockManagerId - cachedPeers) {
-  val start = System.nanoTime
-  data.rewind()
-  logDebug(sTry to replicate $blockId once; The size of the data is 
${data.limit()} Bytes.  +
-sTo node: $peer)
 
-  try {
-blockTransferService.uploadBlockSync(
-  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
-  } catch {
-case e: Exception =
-  logError(sFailed to replicate block to $peer, e)
+// One by one choose a random peer and try uploading the block to it
+// If replication fails (e.g., target peer is down), force the list of 
cached peers
+// to be re-fetched from driver and then pick another random peer for 
replication. Also
+// temporarily black list the peer for which replication failed.
+while (!done) {
+  getRandomPeer() match {
+case Some(peer) =
+  try {
+val onePeerStartTime = System.nanoTime
+data.rewind()
+logTrace(sTrying to replicate $blockId of ${data.limit()} 
bytes to $peer)
+blockTransferService.uploadBlockSync(
+  peer.host, peer.port, blockId.toString, new 
NioByteBufferManagedBuffer(data), tLevel)
+logTrace(sReplicated $blockId of ${data.limit()} bytes to 
$peer in %f ms
+  .format((System.nanoTime - onePeerStartTime) / 1e6))
+peersReplicatedTo += peer
+forceFetchPeers = false
+if (peersReplicatedTo.size == numPeersToReplicateTo) {
+  done = true
+}
+  } catch {
+case e: Exception =
+  logWarning(sFailed to replicate $blockId to $peer, failure 
#$failures, e)
+  failures += 1
+  forceFetchPeers = true
+  peersFailedToReplicateTo += peer
+  if (failures  maxReplicationFailures) {
+done = true
+  }
+  }
+case None =
+  // no peer left to replicate to
+  done = true
--- End diff --

What if initial list had only self in executor list and we are within TTL 
(and so getPeers returns empty list) - bootstrapping time for example.
Do we want to check if server has updates for us ? This will kind of hose 
our ttl though ... but maybe corner case.

Or is this handled already ? Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled

[GitHub] spark pull request: SPARK-3561 - Pluggable strategy to facilitate ...

2014-09-22 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/2422#issuecomment-56373277
  
Is there an example of how this is going to be leveraged ?
The default case is the simple version delegating to existing spark - would 
be good to see how this is used against tez - so that there is a clearer 
picture of how the design changes fit into overall changes proposed : and what 
further changes (if any) might be required in case we need to generalize the 
interface.

As @pwendell mentioned, currently this aspect is not exposed - and if we 
want to expose it via an spi, we need to have better understanding of how it 
will be leveraged.

Also, would Experimental make more sense than DeveloperApi (until it 
stabilizes) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1767: Prefer HDFS-cached replicas when s...

2014-09-23 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1486#issuecomment-56480392
  
Are we proposing to introduce hdfs caching tags/idioms directly into 
TaskSetManager in this pr ?
That does not look right. We need to generalize this so that any rdd can 
specify process/host (maybe rack also ?) annotations. 
Once done, HadoopRdd can leverage that.

Depending on underscore not being in name, etc is fragile.
One option would be to define our uri's: with default reverting to host 
only.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1767: Prefer HDFS-cached replicas when s...

2014-09-23 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1486#issuecomment-56506066
  
@pwendell This is not hadoop RDD specific functionality - it is a general 
requirement which can be leveraged by any RDD in spark - and hadoop RDD 
currently happens to have a usecase for this when dfs caching is used.
The fact that preferred location is currently a String might be the 
limitation here : and so extending it for uri or whatever else will add 
overhead (including current patch).

For example: RDD which pulls data from tachyon or other distributed memory 
stores, loading data into accelerator cards and specifying process local 
locality for the block, etc are all uses of the same functionality imo.

If not addressed properly, when the next similar requirement comes along - 
either we will be rewriting this code; or adding more surgical hacks along same 
lines.


If the expectation is that spark wont need to support these other 
requirements [1], then we can definitely punt on doing a proper design change.

Given this is not user facing change (right ?), we can definitely take 
current approach and replace it later; or do a more principled solution upfront.
@kayousterhout @markhamstra @mateiz any thoughts given this modifies 
TaskSetManager for addition of this feature ?


[1] which is unlikely given mllib's rapid pace of development - it is 
fairly inevitable to have the need to support accelerator cards sooner rather 
than later - atleast given the arc of our past efforts with ml on spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/2366#discussion_r17927862
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -787,31 +790,111 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): 
mutable.HashSet[BlockManagerId] = {
+cachedPeers.synchronized {
+  val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 
* 1000) // milliseconds
+  val timeout = System.currentTimeMillis - lastPeerFetchTime  
cachedPeersTtl
+  if (cachedPeers.isEmpty || forceFetch || timeout) {
+cachedPeers.clear()
+cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
+lastPeerFetchTime = System.currentTimeMillis
+logDebug(Fetched peers from master:  + cachedPeers.mkString([, 
,, ]))
+  }
+}
+cachedPeers
+  }
--- End diff --

There is an MT bug here.
Since cachedPeers is updated in place, it is possible for 'previous' 
invocation to be using cachedPeers while the next invocation is 
clearing/updating it.

We can avoid that by overwriting cachedPeers instance variable with result 
of master.getPeers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...

2014-09-23 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/2366#issuecomment-56566367
  
@tdas In case I did not mention it before :-) this is definitely a great 
improvement over what existed earlier !
I would love it if we could (sometime soon I hope) add support for 
re-replication of blocks due to lost executors : which, currently, is outside 
scope of this PR it seems.

Other than the MT bug I mentioned above, this looks good to me !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49804353
  
We saw a bunch of EOF Exceptions from SpillReader.

java.io.EOFException
at 
java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2577)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1315)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:79)
at 
org.apache.spark.util.collection.ExternalSorter$SpillReader.org$apache$spark$util$collection$ExternalSorter$SpillReader$$readNextItem(ExternalSorter.scala:526)
at 
org.apache.spark.util.collection.ExternalSorter$SpillReader$$anon$6.hasNext(ExternalSorter.scala:560)
at 
scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter$$anonfun$write$2.apply(SortShuffleWriter.scala:93)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter$$anonfun$write$2.apply(SortShuffleWriter.scala:89)


(Exact line might not match master, but the issue might be conveyed I hope)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15259118
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15259190
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional ordering to sort keys within each partition
+ * @param serializer serializer to use when spilling to disk
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf = SparkEnv.get.conf
+  private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 
100) * 1024
+  private val serializerBatchSize = 
conf.getLong(spark.shuffle.spill.batchSize, 1)
+
+  private def getPartition(key: K): Int = {
+if (shouldPartition) partitioner.get.getPartition(key) else 0
+  }
+
+  // Data structures to store in-memory objects before we spill. Depending 
on whether we have an
+  // Aggregator set, we either put objects into an AppendOnlyMap where we 
combine them, or we
+  // store them in an array buffer.
+  var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
+  var buffer = new SizeTrackingBuffer[((Int, K), C)]
+
+  // Track how many elements we've read before we try to estimate memory. 
Ideally we'd use
+  // map.size or buffer.size for this, but because users' Aggregators can 
potentially increase
+  // the size of a merged element when we add values with the same key, 
it's safer to track
+  // elements read from the input iterator.
+  private var elementsRead = 0L
+  private val trackMemoryThreshold = 1000
+
+  // Spilling statistics
+  private var spillCount = 0
+  private var _memoryBytesSpilled = 0L
+  private var _diskBytesSpilled = 0L
+
+  // Collective memory threshold shared across all running tasks
+  private val maxMemoryThreshold = {
+val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 
0.3)
+val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 
0.8)
+(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+  }
+
+  // A comparator for keys K that orders them within a partition to allow 
partial aggregation.
+  // Can be a partial ordering by hash code if a total ordering is not 
provided through

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-22 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49833579
  
I had pulled about 20 mins after I mailed you ...
I have elaborated on why this occurs inline in the code - we can ignore it 
for now though, since it happens even in 'regular' case : we had fixed it so 
long ago, I had forgotten about it and assumed it was some other issue !

We can punt on resolving this for now, and address it when the consolidated 
shuffle pr comes out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15274240
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional Partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional Ordering to sort keys within each partition; 
should be a total ordering
+ * @param serializer serializer to use when spilling to disk
+ *
+ * Note that if an Ordering is given, we'll always sort using it, so only 
provide it if you really
+ * want the output keys to be sorted. In a map task without map-side 
combine for example, you
+ * probably want to pass None as the ordering to avoid extra sorting. On 
the other hand, if you do
+ * want to do combining, having an Ordering is more efficient than not 
having it.
+ *
+ * At a high level, this class works as follows:
+ *
+ * - We repeatedly fill up buffers of in-memory data, using either a 
SizeTrackingAppendOnlyMap if
+ *   we want to combine by key, or an simple SizeTrackingBuffer if we 
don't. Inside these buffers,
+ *   we sort elements of type ((Int, K), C) where the Int is the partition 
ID. This is done to
+ *   avoid calling the partitioner multiple times on the same key (e.g. 
for RangePartitioner).
+ *
+ * - When each buffer reaches our memory limit, we spill it to a file. 
This file is sorted first
+ *   by partition ID and possibly second by key or by hash code of the 
key, if we want to do
+ *   aggregation. For each file, we track how many objects were in each 
partition in memory, so we
+ *   don't have to write out the partition ID for every element.
+ *
+ * - When the user requests an iterator, the spilled files are merged, 
along with any remaining
+ *   in-memory data, using the same sort order defined above (unless both 
sorting and aggregation
+ *   are disabled). If we need to aggregate by key, we either use a total 
ordering from the
+ *   ordering parameter, or read the keys with the same hash code and 
compare them with each other
+ *   for equality to merge values.
+ *
+ * - Users are expected to call stop() at the end to delete all the 
intermediate files.
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf

[GitHub] spark pull request: SPARK-2634: Change MapOutputTrackerWorker.mapS...

2014-07-23 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1541#issuecomment-49855865
  
Instead of a ConcurrentHashMap, we should actually move it to a disk backed 
Map - the cleanup of this datastructure is painful - which it can become 
extremely large; particularly for iterative algo's.
Fortunately, most cases, we just need the last few entries - and so LRU 
scheme by most disk backed map's work beautifully.

We have been using mapdb for this in MapOutputTrackerWorker  - and it has 
worked beautifully.
@rxin might be particularly interested since he is looking into reduce 
memory footprint of spark
CC @mateiz - this is what I had mentioned about earlier.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15288486
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io._
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.BlockId
+
+/**
+ * Sorts and potentially merges a number of key-value pairs of type (K, V) 
to produce key-combiner
+ * pairs of type (K, C). Uses a Partitioner to first group the keys into 
partitions, and then
+ * optionally sorts keys within each partition using a custom Comparator. 
Can output a single
+ * partitioned file with a different byte range for each partition, 
suitable for shuffle fetches.
+ *
+ * If combining is disabled, the type C must equal V -- we'll cast the 
objects at the end.
+ *
+ * @param aggregator optional Aggregator with combine functions to use for 
merging data
+ * @param partitioner optional Partitioner; if given, sort by partition ID 
and then key
+ * @param ordering optional Ordering to sort keys within each partition; 
should be a total ordering
+ * @param serializer serializer to use when spilling to disk
+ *
+ * Note that if an Ordering is given, we'll always sort using it, so only 
provide it if you really
+ * want the output keys to be sorted. In a map task without map-side 
combine for example, you
+ * probably want to pass None as the ordering to avoid extra sorting. On 
the other hand, if you do
+ * want to do combining, having an Ordering is more efficient than not 
having it.
+ *
+ * At a high level, this class works as follows:
+ *
+ * - We repeatedly fill up buffers of in-memory data, using either a 
SizeTrackingAppendOnlyMap if
+ *   we want to combine by key, or an simple SizeTrackingBuffer if we 
don't. Inside these buffers,
+ *   we sort elements of type ((Int, K), C) where the Int is the partition 
ID. This is done to
+ *   avoid calling the partitioner multiple times on the same key (e.g. 
for RangePartitioner).
+ *
+ * - When each buffer reaches our memory limit, we spill it to a file. 
This file is sorted first
+ *   by partition ID and possibly second by key or by hash code of the 
key, if we want to do
+ *   aggregation. For each file, we track how many objects were in each 
partition in memory, so we
+ *   don't have to write out the partition ID for every element.
+ *
+ * - When the user requests an iterator, the spilled files are merged, 
along with any remaining
+ *   in-memory data, using the same sort order defined above (unless both 
sorting and aggregation
+ *   are disabled). If we need to aggregate by key, we either use a total 
ordering from the
+ *   ordering parameter, or read the keys with the same hash code and 
compare them with each other
+ *   for equality to merge values.
+ *
+ * - Users are expected to call stop() at the end to delete all the 
intermediate files.
+ */
+private[spark] class ExternalSorter[K, V, C](
+aggregator: Option[Aggregator[K, V, C]] = None,
+partitioner: Option[Partitioner] = None,
+ordering: Option[Ordering[K]] = None,
+serializer: Option[Serializer] = None) extends Logging {
+
+  private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
+  private val shouldPartition = numPartitions  1
+
+  private val blockManager = SparkEnv.get.blockManager
+  private val diskBlockManager = blockManager.diskBlockManager
+  private val ser = Serializer.getSerializer(serializer)
+  private val serInstance = ser.newInstance()
+
+  private val conf

[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-23 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49949511
  
@mateiz The total memory overhead actually goes much higher than 
num_streams right ?
It should be order of num_streams + num_values for this key.

For fairly large values, the latter might fit into memory, but the former 
might not (particularly as number of mappers increases).

Or did I get this wrong from the PR ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-25 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-50115353
  
Running tests with 
export 
SPARK_JAVA_OPTS=-Dspark.shuffle.manager=org.apache.spark.shuffle.sort.SortShuffleManager
 causes :
'''
- sorting using mutable pairs *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 
3.0:1 failed 4 times, most recent failure: Exception failure in TID 14 on host 
localhost: java.lang.ArrayStoreException: scala.Tuple2
scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
scala.Array$.slowcopy(Array.scala:81)
scala.Array$.copy(Array.scala:107)

scala.collection.mutable.ResizableArray$class.copyToArray(ResizableArray.scala:77)

scala.collection.mutable.ArrayBuffer.copyToArray(ArrayBuffer.scala:47)

scala.collection.TraversableOnce$class.copyToArray(TraversableOnce.scala:241)

scala.collection.AbstractTraversable.copyToArray(Traversable.scala:105)

scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:249)
scala.collection.AbstractTraversable.toArray(Traversable.scala:105)

scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)

org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62)

org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:61)
org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:581)
org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:581)

org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112)
org.apache.spark.scheduler.Task.run(Task.scala:51)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
  at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1055)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1039)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1037)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1037)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:641)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:641)
  at scala.Option.foreach(Option.scala:236)
  at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:641)
  ...
'''

The actual classes are :
'''
classes = [class scala.Tuple2] ; 
[class scala.Tuple2,
class scala.Tuple2,
class scala.Tuple2]
'''
obtained using :
'''
  def collect(): Array[T] = {
println(classes =  + sc.runJob(this, (iter: Iterator[T]) =
  iter.map(v = if (v == null) null else v.getClass).mkString([, 
,\n\t\t, ])).
  mkString(,  ; \n\t, ))
System.out.flush()
val results = sc.runJob(this, (iter: Iterator[T]) = iter.toArray)
''' in RDD.collect


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-25 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-50115453
  
BTW, this is one of 5 failures from core.
I hope there are no merge issues though,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-25 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-50116492
  
ah, thanks ! rerunning with 9c29957.
cant pull the pr - and manual merge is painful, hence delays in testing :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2671] BlockObjectWriter should create p...

2014-07-26 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1580#issuecomment-50246267
  
Actually we have also seen this happen multiple times.
A few have them have been fixed, but not all have been identified.

For example, there is incorrect DCL for directory creation in spark.

The tricky bit is preventing creation of directories when shutdown is
happening (either via exit or via driver message).


The consolidated shuffle bug fix includes some attempts to resolve it : but
since we could not identify/nail down all cases.
We introduced something similar : retry in case there is a
FileNotFoundException creating file stream but prevent in case we were
going to shutdown (so that we dont mess up shutdown hooks trying to remove
directories !).


Regards,
Mridul



On Fri, Jul 25, 2014 at 11:41 PM, Matei Zaharia notificati...@github.com
wrote:

 But the point above was that the code that creates this object goes
 through DiskBlockManager.getFile, which already creates any non-existent
 directories. So I don't think this will be a problem, unless a directory 
is
 deleted exactly in the instant when we return a File and we start writing.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/1580#issuecomment-50184666.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2294: fix locality inversion bug in Task...

2014-07-27 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1313#issuecomment-50257258
  
Since all process local tasks are also node, rack and any : we will incur
node local delay also.
On 27-Jul-2014 11:09 am, Matei Zaharia notificati...@github.com wrote:

 I thought that we can skip locality levels in the waiting phase if we have
 no tasks for them -- that's why we have computeValidLocalityLevels. In
 the case you mentioned, the valid levels would only be PROCESS_LOCAL and
 NO_PREFS.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/1313#issuecomment-50256511.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread mridulm
GitHub user mridulm opened a pull request:

https://github.com/apache/spark/pull/1609

[SPARK-2532] WIP Consolidated shuffle fixes

Status of the PR
- [X] Cherry pick and merge changes from internal branch to spark master
- [X] Remove WIP comments and 2G branch references.
- [X] Tests for BlockObjectWriter
- [ ] Tests for ExternalAppendOnlyMap
- [ ] Tests for MapOutputTracker
- [ ] Tests for ShuffleBlockManager




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mridulm/spark consolidated_shuffle_fixes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1609.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1609


commit f1182f8a3d3328248d471038d6ab0db6e6a1396d
Author: Mridul Muralidharan mridul...@apache.org
Date:   2014-07-27T15:23:05Z

Consolidated shuffle fixes




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15442779
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala 
---
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+import java.io.{IOException, FileOutputStream, OutputStream, File}
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+
+/**
+ * Test various code paths in DiskBlockObjectWriter
+ */
+class DiskBlockObjectWriterSuite extends FunSuite {
+
+  private val conf = new SparkConf
+  private val BUFFER_SIZE = 32 * 1024
+
+  private def tempFile(): File = {
+val file = File.createTempFile(temp_, block)
+// We dont want file to exist ! Just need a temp file name
+file.delete()
+file
+  }
+
+  private def createWriter(file: File = tempFile()) :
+  (File, DiskBlockObjectWriter) = {
+file.deleteOnExit()
+
+(file, new DiskBlockObjectWriter(BlockId(test_1), file,
+  new JavaSerializer(conf), BUFFER_SIZE, (out: OutputStream) = out, 
true))
+  }
+
+
+  test(write after close should throw IOException) {
+val (file, bow) = createWriter()
+bow.write(test)
+bow.write(test1)
+assert (file.exists()  file.isFile)
+
+bow.commitAndClose()
+
+intercept[IOException] {
+  bow.write(test2)
+}
+
+file.delete()
+  }
+
+  test(write after revert should throw IOException) {
+val (file, bow) = createWriter()
+bow.write(test)
+bow.write(test1)
+assert (file.exists()  file.isFile)
+
+bow.revertPartialWritesAndClose()
+
+intercept[IOException] {
+  bow.write(test2)
+}
+
+file.delete()
+  }
+
+  test(create even if directory does not exist) {
+val dir = File.createTempFile(temp_, dir)
+dir.delete()
+
+val file = new File(dir, temp.file)
+file.deleteOnExit()
+
+val bow = new DiskBlockObjectWriter(BlockId(test_1), file, new 
JavaSerializer(conf),
+  BUFFER_SIZE, (out: OutputStream) = out, true)
+
+bow.write(test)
+assert (file.exists()  file.isFile)
+bow.commitAndClose()
+Utils.deleteRecursively(dir)
+  }
+
+  test(revert of new file should delete it) {
+val (file, bow) = createWriter()
+bow.write(test)
+bow.write(test1)
+assert (file.exists()  file.isFile)
+
+bow.revertPartialWritesAndClose()
+assert (! file.exists())
+// file.delete()
+  }
+
+  test(revert of existing file should revert it to previous state) {
+val (file, bow1) = createWriter()
+
+bow1.write(test)
+bow1.write(test1)
+assert (file.exists()  file.isFile)
+
+bow1.commitAndClose()
+val length = file.length()
+
+// reopen same file.
+val bow2 = createWriter(file)._2
+
+bow2.write(test3)
+bow2.write(test4)
+
+assert (file.exists()  file.isFile)
+
+bow2.revertPartialWritesAndClose()
+assert (file.exists())
+assert (length == file.length())
+file.delete()
+  }
+
+  test(revert of writer after close should delete if it did not exist 
earlier) {
+val (file, bow) = createWriter(tempFile())
+
+bow.write(test)
+bow.write(test1)
+assert (file.exists()  file.isFile)
+
+bow.commitAndClose()
+val length = file.length()
+
+assert (file.exists()  file.isFile)
+assert (length  0)
+
+// Now revert the file, after it has been closed : should delete the 
file
+// since it did not exist earlier.
+bow.revertPartialWritesAndClose()
+assert (! file.exists

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15448803
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -935,15 +941,22 @@ private[spark] object Utils extends Logging {
* Currently, this detects whether the JVM is shutting down by 
Runtime#addShutdownHook throwing
* an IllegalStateException.
*/
+  @volatile private var shutdownStarted = false
+  private[spark] def setShutdownStarted() {
+shutdownStarted = true
+  }
   def inShutdown(): Boolean = {
 try {
+  if (shutdownStarted) return true
--- End diff --

Unfortunately, had to be reverted :-(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50306648
  
Accidental close, apologies !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread mridulm
Github user mridulm closed the pull request at:

https://github.com/apache/spark/pull/1609


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50306633
  
@witgo I did not understand the space issue : stylecheck seems to run fine.

Regarding the actual issues : the JIRA lists some of them - unfortunately 
it is not exhaustive.
Spark code assumes a few things :
1) A flush followed by close should not cause additional data to be written 
to the stream - which is not valid in general case (close can still write more 
data).
2) reading an object from stream will consume all data written as  part of 
the object - which is not valid in general case, additional info could have 
been written after the object was written (like reset markers in java serde). 
So stream wrapping has to account for that.
3) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread mridulm
GitHub user mridulm reopened a pull request:

https://github.com/apache/spark/pull/1609

[SPARK-2532] WIP Consolidated shuffle fixes

Status of the PR
- [X] Cherry pick and merge changes from internal branch to spark master
- [X] Remove WIP comments and 2G branch references.
- [X] Tests for BlockObjectWriter
- [ ] Tests for ExternalAppendOnlyMap
- [x] Tests for ShuffleBlockManager




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mridulm/spark consolidated_shuffle_fixes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1609.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1609


commit f1182f8a3d3328248d471038d6ab0db6e6a1396d
Author: Mridul Muralidharan mridul...@apache.org
Date:   2014-07-27T15:23:05Z

Consolidated shuffle fixes

commit 66d6ec3f99882ad6062c5bff36f2edb82b0c24c0
Author: Mridul Muralidharan mridul...@apache.org
Date:   2014-07-27T15:40:32Z

Add missing setShutdownStarted hooks

commit 027c7f18c44c57960a2a94eee961f0aa811e7a34
Author: Mridul Muralidharan mridul...@apache.org
Date:   2014-07-27T16:05:31Z

stylecheck fixes

commit 195c529c1ae5ffa7e8f9cf6af4df8b9536a39d6a
Author: Mridul Muralidharan mridul...@apache.org
Date:   2014-07-27T23:46:53Z

Fix build, add testcases for DiskBlockManagerSuite

commit 6095545bf55a87ef0b28bc11adc63dcc5b661b6c
Author: Mridul Muralidharan mridul...@apache.org
Date:   2014-07-27T23:50:45Z

Consolidated fixes

commit 1c1faea69d9709c7e65afc9bdd13a8e0d5488c82
Author: Mridul Muralidharan mridul...@apache.org
Date:   2014-07-27T23:50:50Z

Merge branch 'consolidated_shuffle_fixes' of github.com:mridulm/spark into 
consolidated_shuffle_fixes

commit fbf20f792baf7ab8d6705c7a9525c2db92bb7ae3
Author: Mridul Muralidharan mridul...@apache.org
Date:   2014-07-28T06:59:45Z

Disable code to detect programming shutdown via stop's. Make actor/store 
shutdown in DiskBlockManagerSuite more robust




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50307155
  
Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

2014-07-28 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15449433
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -947,6 +958,34 @@ private[spark] object Utils extends Logging {
 }
 false
   }
+  /*
+  @volatile private var shutdownStarted = false
+  private[spark] def setShutdownStarted() {
+shutdownStarted = true
+  }
+
+  def inShutdown(): Boolean = {
+if (shutdownStarted) return true
+doShutdownCheck()
+shutdownStarted
+  }
+
+  private[spark] def doShutdownCheck() {
+var shutdown = false
+try {
+  val hook = new Thread {
+override def run() {}
+  }
+  Runtime.getRuntime.addShutdownHook(hook)
+  Runtime.getRuntime.removeShutdownHook(hook)
+} catch {
+  case ise: IllegalStateException =
+shutdown = true
+} finally {
+  shutdownStarted = shutdown
+}
+  }
+  */
--- End diff --

This (and related commented inShutdown references) has been left around in 
case someone else can suggest an improvement !
The issue is, this works fine in 'normal' use : but while running spark 
tests in local mode, it fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50455483
  
All pending fixes work be done.
I dont think there are any pieces missing in the merge from internal branch 
to master.
Open for review, thanks !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

2014-07-29 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1542#issuecomment-50488319
  
@pwendell @mateiz was this PR really merged into spark ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15537366
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala ---
@@ -40,7 +40,7 @@ private[spark] class JavaSerializationStream(out: 
OutputStream, counterReset: In
*/
   def writeObject[T: ClassTag](t: T): SerializationStream = {
 objOut.writeObject(t)
-if (counterReset  0  counter = counterReset) {
+if (counterReset = 0  counter = counterReset) {
--- End diff --

This was done only to support adding marker after each object has been 
written.
Only practical reason to do this is to test that part.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

2014-07-29 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1542#issuecomment-50509704
  
That was super scarey ! Thanks for clarifying @aarondav 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15540734
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
@@ -116,8 +118,13 @@ class HashShuffleWriter[K, V](
   private def revertWrites(): Unit = {
 if (shuffle != null  shuffle.writers != null) {
   for (writer - shuffle.writers) {
-writer.revertPartialWrites()
-writer.close()
+try {
+  writer.revertPartialWritesAndClose()
+} catch {
+  // Ensure that all revert's get done - log exception and continue
+  case ex: Exception =
+logError(Exception reverting/closing writers, ex)
+}
   }
--- End diff --

revert/close can throw exception - causing other writers to be left hanging.
Hence, log and continue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15540782
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
@@ -71,7 +72,8 @@ class HashShuffleWriter[K, V](
 try {
   return Some(commitWritesAndBuildStatus())
 } catch {
-  case e: Exception =
+  case e: Throwable =
+success = false // for finally block
 revertWrites()
--- End diff --

If success != false, then release writers will attempt to register.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15540934
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists()  file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException(Already closed)
+
+assert (! initialized)
+assert (! wasOpenedOnce)
+var exists = false
+try {
+  exists = file.exists()
+  if (! exists  initiallyExists  0 != initialPosition  ! 
Utils.inShutdown) {
+// Was deleted by cleanup thread ?
+throw new IOException(file  + file +  cleaned up ? exists =  + 
exists +
+  , initiallyExists =  + initiallyExists + , initialPosition = 
 + initialPosition)
+  }
+  fos = new FileOutputStream(file, true)
+} catch {
+  case fEx: FileNotFoundException =
+// There seems to be some race in directory creation.
+// Attempts to fix it dont seem to have worked : working around 
the problem for now.
+logDebug(Unable to open  + file + , canRetry =  + canRetry + 
, exists =  + exists +
+  , initialPosition =  + initialPosition + , in shutdown =  + 
Utils.inShutdown(), fEx)
+if (canRetry  ! Utils.inShutdown()) {
+  // try creating the parent directory if that is the issue.
+  // Since there can be race with others, dont bother checking for
+  // success/failure - the call to init() will resolve if fos can 
be created.
+  file.getParentFile.mkdirs()
+  // Note, if directory did not exist, then file does not either - 
and so
+  // initialPosition would be zero in either case.
+  init(canRetry = false)
--- End diff --

As mentioned in the comments, this tries to retry once in case file could 
not be created due to lack of presence of directory (which is what the 
FileNotFoundException is usually for) : except when we are already in shutdown.
This is a case which happens due to some race in spark between creation of 
directory and allowing files to be created under that directory.

We have fixed a double checked locking bug below (in DiskBlockManager) but 
looks like it is not sufficient - since this was observed even after that.
(In our branch, the logDebug is actually logError to flush out these cases).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15541003
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists()  file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException(Already closed)
+
+assert (! initialized)
+assert (! wasOpenedOnce)
+var exists = false
+try {
+  exists = file.exists()
+  if (! exists  initiallyExists  0 != initialPosition  ! 
Utils.inShutdown) {
+// Was deleted by cleanup thread ?
+throw new IOException(file  + file +  cleaned up ? exists =  + 
exists +
+  , initiallyExists =  + initiallyExists + , initialPosition = 
 + initialPosition)
+  }
+  fos = new FileOutputStream(file, true)
+} catch {
+  case fEx: FileNotFoundException =
+// There seems to be some race in directory creation.
+// Attempts to fix it dont seem to have worked : working around 
the problem for now.
+logDebug(Unable to open  + file + , canRetry =  + canRetry + 
, exists =  + exists +
+  , initialPosition =  + initialPosition + , in shutdown =  + 
Utils.inShutdown(), fEx)
+if (canRetry  ! Utils.inShutdown()) {
+  // try creating the parent directory if that is the issue.
+  // Since there can be race with others, dont bother checking for
+  // success/failure - the call to init() will resolve if fos can 
be created.
+  file.getParentFile.mkdirs()
+  // Note, if directory did not exist, then file does not either - 
and so
+  // initialPosition would be zero in either case.
+  init(canRetry = false)
+  return
+} else throw fEx
+}
+
+try {
+  // This is to workaround case where creation of object and actual 
init
+  // (which can happen much later) happens after a delay and the 
cleanup thread
+  // cleaned up the file.
+  channel = fos.getChannel
+  val fosPos = channel.position()
+  if (initialPosition != fosPos) {
+throw new IOException(file cleaned up ?  + file.exists() + 
+  , initialpos =  + initialPosition +
+  current len =  + fosPos + , in shutdown ?  + 
Utils.inShutdown)
+  }
+
+  ts = new TimeTrackingOutputStream(fos)
+  val bos = new BufferedOutputStream(ts, bufferSize)
+  bs = compressStream(bos)
+  objOut = serializer.newInstance().serializeStream(bs)
+  initialized = true
+  wasOpenedOnce = true;
+} finally {
+  if (! initialized) {
+// failed, cleanup state.
+val tfos = fos
+updateCloseState()
+tfos.close()
+  }
+}
+  }
+
+  private def open(): BlockObjectWriter = {
+init()
 lastValidPosition = initialPosition
-bs = compressStream(new BufferedOutputStream(ts, bufferSize))
-objOut = serializer.newInstance().serializeStream(bs)
-initialized = true
 this
   }
 
-  override def close() {
-if (initialized) {
-  if (syncWrites

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15541065
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -188,6 +425,39 @@ private[spark] class DiskBlockObjectWriter(
 
   // Only valid if called after commit()
   override def bytesWritten: Long = {
-lastValidPosition - initialPosition
+val retval = lastValidPosition - initialPosition
+
+assert(retval = 0 || Utils.inShutdown(),
+  exists =  + file.exists() + , bytesWritten =  + retval +
+  , lastValidPosition =  + lastValidPosition + , initialPosition = 
 + initialPosition +
+  , in shutdown =  + Utils.inShutdown())
+
+// TODO: Comment this out when we are done validating : can be 
expensive due to file.length()
+assert (file.length() = lastValidPosition || Utils.inShutdown(),
+  exists =  + file.exists() + , file len =  + file.length() +
+  , bytesWritten =  + retval + , lastValidPosition =  + 
lastValidPosition +
+  , initialPosition =  + initialPosition + , in shutdown =  + 
Utils.inShutdown())
+
+if (retval = 0) retval else 0
   }
 }
+
+object DiskBlockObjectWriter{
+
+  // Unfortunately, cant do it atomically ...
+  private def truncateIfExists(file: File, truncatePos: Long) {
+var fos: FileOutputStream = null
+try {
+  // There is no way to do this atomically iirc.
+  if (file.exists()  file.length() != truncatePos) {
+fos = new FileOutputStream(file, true)
+fos.getChannel.truncate(truncatePos)
+  }
+} finally {
+  if (fos ne null) {
+fos.close()
+  }
+}
+  }
+}
--- End diff --

Hopefully, rest of the code changes in this class is documented (a bit too 
heavily probably ? lighten it before final commit ?) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15541257
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala ---
@@ -236,31 +241,61 @@ object ShuffleBlockManager {
   new PrimitiveVector[Long]()
 }
 
-def numBlocks = mapIdToIndex.size
+/*
+ * This is required for shuffle consolidation to work. In particular 
when updates to file are
+ * happening while parallel requests to fetch block happens.
+ */
+private val blockLengthsByReducer = 
Array.fill[PrimitiveVector[Long]](files.length) {
+  new PrimitiveVector[Long]()
+}
+
+private var numBlocks = 0
--- End diff --

The reason for change to var from def is perhaps subtle.
Consider the case of :

add for mapIdToIndex with mapId 0
add for mapIdToIndex with mapId 1
add for mapIdToIndex with mapId 0 (on re-execution)
add for mapIdToIndex with mapId 1 (on re-execution)

Now both 0 and 1 will end up with the same index assigned (since it was 
based on mapIdToIndex.size).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15541435
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -353,26 +368,53 @@ class ExternalAppendOnlyMap[K, V, C](
*/
   private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: 
ArrayBuffer[Long])
 extends Iterator[(K, C)] {
-private val fileStream = new FileInputStream(file)
-private val bufferedStream = new BufferedInputStream(fileStream, 
fileBufferSize)
+
+assert (! batchSizes.isEmpty)
+assert (! batchSizes.exists(_ = 0))
+private val batchOffsets = batchSizes.scanLeft(0L)(_ + _)
--- End diff --

This is to give us the starting offset for each batch; and not rely on 
where the last batch's read ended.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15542308
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -418,7 +459,25 @@ class ExternalAppendOnlyMap[K, V, C](
 
 // TODO: Ensure this gets called even if the iterator isn't drained.
 private def cleanup() {
-  deserializeStream.close()
+  batchIndex = batchOffsets.length
+  val dstrm = deserializeStream
+  val fstrm = fileStream
+  deserializeStream = null
+  fileStream = null
+
+  if (dstrm ne null) {
+try {
+  dstrm.close()
+} catch {
+  case ioEx: IOException = {
+// best case attempt - atleast free the handles
+if (fstrm ne null) {
+  try { fstrm.close() } catch {case ioEx: IOException = }
+}
+throw ioEx
+  }
+}
+  }
--- End diff --

This is just more defensive cleanup compared to earlier and setting 
batchIndex to unusable value.
To ensure that any close related exceptions do not result in unclosed files 
(which impacts ulimit of spark process)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1609#issuecomment-50517754
  
I have added some comments to the PR in the hopes that it will aid in the 
review.

I am sure it is still involved process inspite of this, so please do feel 
free to raise as many queries as required : sometimes they might trigger 
unearthing some other issues as part of the discussion.
I want to ensure that we do not miss on any subtle issue here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2045 Sort-based shuffle

2014-07-29 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-50522967
  
@mateiz please refer to changes here : 
https://github.com/apache/spark/pull/1609/files#diff-10
They should be relevant to this PR too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15565447
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists()  file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
--- End diff --

As I mentioned above, we did fix a dcl bug, bug that did not seem 
sufficient.
I agree this is a rare condition, and the 'fix' is a hack to workaround the 
problem : but pending identifying root cause, this is the best we have 
unfortunately.
Any thoughts ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15565486
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
+
+  // Did we create this file or was it already present : used in revert to 
decide
+  // if we should delete this file or not. Also used to detect if file was 
deleted
+  // between creation of BOW and its actual init
+  private val initiallyExists = file.exists()  file.isFile
   private val initialPosition = file.length()
   private var lastValidPosition = initialPosition
+
   private var initialized = false
+  // closed explicitly ?
+  private var closed = false
+  // Attempt to cleanly close ? (could also be closed via revert)
+  // Note, a cleanly closed file could be subsequently reverted
+  private var cleanCloseAttempted = false
+  // Was the file actually opened atleast once.
+  // Note: initialized/streams change state with close/revert.
+  private var wasOpenedOnce = false
   private var _timeWriting = 0L
 
-  override def open(): BlockObjectWriter = {
-fos = new FileOutputStream(file, true)
-ts = new TimeTrackingOutputStream(fos)
-channel = fos.getChannel()
+  // Due to some directory creation race issues in spark, it has been 
observed that
+  // sometimes file creation happens 'before' the actual directory has 
been created
+  // So we attempt to retry atleast once with a mkdirs in case directory 
was missing.
+  private def init() {
+init(canRetry = true)
+  }
+
+  private def init(canRetry: Boolean) {
+
+if (closed) throw new IOException(Already closed)
+
+assert (! initialized)
+assert (! wasOpenedOnce)
+var exists = false
+try {
+  exists = file.exists()
+  if (! exists  initiallyExists  0 != initialPosition  ! 
Utils.inShutdown) {
+// Was deleted by cleanup thread ?
+throw new IOException(file  + file +  cleaned up ? exists =  + 
exists +
+  , initiallyExists =  + initiallyExists + , initialPosition = 
 + initialPosition)
+  }
+  fos = new FileOutputStream(file, true)
+} catch {
+  case fEx: FileNotFoundException =
+// There seems to be some race in directory creation.
+// Attempts to fix it dont seem to have worked : working around 
the problem for now.
+logDebug(Unable to open  + file + , canRetry =  + canRetry + 
, exists =  + exists +
+  , initialPosition =  + initialPosition + , in shutdown =  + 
Utils.inShutdown(), fEx)
+if (canRetry  ! Utils.inShutdown()) {
+  // try creating the parent directory if that is the issue.
+  // Since there can be race with others, dont bother checking for
+  // success/failure - the call to init() will resolve if fos can 
be created.
+  file.getParentFile.mkdirs()
+  // Note, if directory did not exist, then file does not either - 
and so
+  // initialPosition would be zero in either case.
+  init(canRetry = false)
+  return
+} else throw fEx
+}
+
+try {
+  // This is to workaround case where creation of object and actual 
init
+  // (which can happen much later) happens after a delay and the 
cleanup thread
+  // cleaned up the file.
+  channel = fos.getChannel
+  val fosPos = channel.position()
+  if (initialPosition != fosPos) {
+throw new IOException(file cleaned up ?  + file.exists() + 
+  , initialpos =  + initialPosition +
+  current len =  + fosPos + , in shutdown ?  + 
Utils.inShutdown)
+  }
+
+  ts = new TimeTrackingOutputStream(fos)
+  val bos = new BufferedOutputStream(ts, bufferSize)
+  bs = compressStream(bos)
+  objOut = serializer.newInstance().serializeStream(bs)
+  initialized = true
+  wasOpenedOnce = true;
+} finally {
+  if (! initialized) {
+// failed, cleanup state.
+val tfos = fos
+updateCloseState()
+tfos.close()
+  }
+}
+  }
+
+  private def open(): BlockObjectWriter = {
+init()
 lastValidPosition = initialPosition
-bs = compressStream(new BufferedOutputStream(ts, bufferSize))
-objOut = serializer.newInstance().serializeStream(bs)
-initialized = true
 this
   }
 
-  override def close() {
-if (initialized) {
-  if (syncWrites

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

2014-07-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1609#discussion_r15565552
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -947,6 +958,34 @@ private[spark] object Utils extends Logging {
 }
 false
   }
+  /*
+  @volatile private var shutdownStarted = false
+  private[spark] def setShutdownStarted() {
+shutdownStarted = true
+  }
+
+  def inShutdown(): Boolean = {
+if (shutdownStarted) return true
+doShutdownCheck()
+shutdownStarted
+  }
+
+  private[spark] def doShutdownCheck() {
+var shutdown = false
+try {
+  val hook = new Thread {
+override def run() {}
+  }
+  Runtime.getRuntime.addShutdownHook(hook)
+  Runtime.getRuntime.removeShutdownHook(hook)
+} catch {
+  case ise: IllegalStateException =
+shutdown = true
+} finally {
+  shutdownStarted = shutdown
+}
+  }
+  */
--- End diff --

Sure, will move it out when I push next.
It becomes directly relevant to this patch since there are assertions which 
check for either file/directory being in expected state or VM is in shutdown 
(and so cleanup happened/is happening - which caused file deletions).
For VM shutdown, this is just an optimization - but for shutdown ordered by 
driver, inShutdown will return false, but same codepath as shutdown is invoked 
by spark (stop on various subsystems) : resulting in exceptions/assertion 
failures in threads which are still running.

Unfortunately, this diff interacts badly with local mode - particularly 
tests, since it keeps reusing the same VM.
Any ideas on how to 'fix' or resolve this ? Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2532: Minimal shuffle consolidation fixe...

2014-07-31 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1678#discussion_r15682389
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
@@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V](
   private def revertWrites(): Unit = {
 if (shuffle != null  shuffle.writers != null) {
   for (writer - shuffle.writers) {
-writer.revertPartialWrites()
-writer.close()
+writer.revertPartialWritesAndClose()
--- End diff --

revert can throw exception : which will cause other writers to not revert.
We need to wrap it in try/catch, log and continue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2532: Minimal shuffle consolidation fixe...

2014-07-31 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1678#discussion_r15682412
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(
 
   override def isOpen: Boolean = objOut != null
 
-  override def commit(): Long = {
+  override def commitAndClose(): Unit = {
 if (initialized) {
   // NOTE: Because Kryo doesn't flush the underlying stream we 
explicitly flush both the
   //   serializer stream and the lower level stream.
   objOut.flush()
   bs.flush()
-  val prevPos = lastValidPosition
-  lastValidPosition = channel.position()
-  lastValidPosition - prevPos
-} else {
-  // lastValidPosition is zero if stream is uninitialized
-  lastValidPosition
+  close()
 }
+finalPosition = file.length()
   }
 
-  override def revertPartialWrites() {
-if (initialized) {
-  // Discard current writes. We do this by flushing the outstanding 
writes and
-  // truncate the file to the last valid position.
-  objOut.flush()
-  bs.flush()
-  channel.truncate(lastValidPosition)
+  // Discard current writes. We do this by flushing the outstanding writes 
and then
+  // truncating the file to its initial position.
+  override def revertPartialWritesAndClose() {
+try {
+  if (initialized) {
+objOut.flush()
+bs.flush()
+close()
+  }
+
+  val truncateStream = new FileOutputStream(file, true)
+  try {
+truncateStream.getChannel.truncate(initialPosition)
+  } finally {
+truncateStream.close()
+  }
+} catch {
+  case e: Exception =
+logError(Uncaught exception while reverting partial writes to 
file  + file, e)
--- End diff --

In the use of writers in HashShuffleWriter, it is possible for a closed 
stream to be reverted (if some other stream's close failed for example).
In the above, that will leave this file with leftover data - I am not sure 
what the impact of this would be.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2532: Minimal shuffle consolidation fixe...

2014-07-31 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1678#discussion_r15682457
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(
 
   override def isOpen: Boolean = objOut != null
 
-  override def commit(): Long = {
+  override def commitAndClose(): Unit = {
--- End diff --

We should remove close from the interface, and make it private to this 
class btw.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2532: Minimal shuffle consolidation fixe...

2014-07-31 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1678#discussion_r15683205
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(
 
   override def isOpen: Boolean = objOut != null
 
-  override def commit(): Long = {
+  override def commitAndClose(): Unit = {
 if (initialized) {
   // NOTE: Because Kryo doesn't flush the underlying stream we 
explicitly flush both the
   //   serializer stream and the lower level stream.
   objOut.flush()
   bs.flush()
-  val prevPos = lastValidPosition
-  lastValidPosition = channel.position()
-  lastValidPosition - prevPos
-} else {
-  // lastValidPosition is zero if stream is uninitialized
-  lastValidPosition
+  close()
 }
+finalPosition = file.length()
   }
 
-  override def revertPartialWrites() {
-if (initialized) {
-  // Discard current writes. We do this by flushing the outstanding 
writes and
-  // truncate the file to the last valid position.
-  objOut.flush()
-  bs.flush()
-  channel.truncate(lastValidPosition)
+  // Discard current writes. We do this by flushing the outstanding writes 
and then
+  // truncating the file to its initial position.
+  override def revertPartialWritesAndClose() {
+try {
+  if (initialized) {
+objOut.flush()
+bs.flush()
+close()
+  }
+
+  val truncateStream = new FileOutputStream(file, true)
+  try {
+truncateStream.getChannel.truncate(initialPosition)
+  } finally {
+truncateStream.close()
+  }
+} catch {
+  case e: Exception =
+logError(Uncaught exception while reverting partial writes to 
file  + file, e)
--- End diff --

I meant the former case : close on a writer fails with an exception; while 
earlier streams succeeded.
So now we have some writers which have committed data (which is not removed 
by subsequent revert) while others are reverted.

On the face of it, I agree, it should not cause issues : but then since the 
expectation from this class is never enforced; and so can silently fail. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2532: Minimal shuffle consolidation fixe...

2014-07-31 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1678#discussion_r15683224
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(
 
   override def isOpen: Boolean = objOut != null
 
-  override def commit(): Long = {
+  override def commitAndClose(): Unit = {
--- End diff --

When I merged the sort patch, and modified EAOM, it was simply replace 
close with commitAndClose.
commitAndClose should be semantically equivalent to close actually.
It is not equivalent to commit() - but we want to remove that :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2033] Automatically cleanup checkpoint

2014-08-01 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/855#issuecomment-50895949
  
This definitely is much better, thanks for the PR !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2532: Minimal shuffle consolidation fixe...

2014-08-01 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1678#discussion_r15701250
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
@@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter(
 
   override def isOpen: Boolean = objOut != null
 
-  override def commit(): Long = {
+  override def commitAndClose(): Unit = {
 if (initialized) {
   // NOTE: Because Kryo doesn't flush the underlying stream we 
explicitly flush both the
   //   serializer stream and the lower level stream.
   objOut.flush()
   bs.flush()
-  val prevPos = lastValidPosition
-  lastValidPosition = channel.position()
-  lastValidPosition - prevPos
-} else {
-  // lastValidPosition is zero if stream is uninitialized
-  lastValidPosition
+  close()
 }
+finalPosition = file.length()
   }
 
-  override def revertPartialWrites() {
-if (initialized) {
-  // Discard current writes. We do this by flushing the outstanding 
writes and
-  // truncate the file to the last valid position.
-  objOut.flush()
-  bs.flush()
-  channel.truncate(lastValidPosition)
+  // Discard current writes. We do this by flushing the outstanding writes 
and then
+  // truncating the file to its initial position.
+  override def revertPartialWritesAndClose() {
+try {
+  if (initialized) {
+objOut.flush()
+bs.flush()
+close()
+  }
+
+  val truncateStream = new FileOutputStream(file, true)
+  try {
+truncateStream.getChannel.truncate(initialPosition)
+  } finally {
+truncateStream.close()
+  }
+} catch {
+  case e: Exception =
+logError(Uncaught exception while reverting partial writes to 
file  + file, e)
--- End diff --

Ah, did not notice that the if (initialized) did not include the truncate 
call !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1767: Prefer HDFS-cached replicas when s...

2014-08-01 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1486#discussion_r15725601
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---
@@ -243,10 +244,23 @@ class HadoopRDD[K, V](
 new HadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
   }
 
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-// TODO: Filtering out localhost in case of file:// URLs
-val hadoopSplit = split.asInstanceOf[HadoopPartition]
-hadoopSplit.inputSplit.value.getLocations.filter(_ != localhost)
+  override def getPreferredLocations(hsplit: Partition): Seq[String] = {
+val split = hsplit.asInstanceOf[HadoopPartition].inputSplit.value
+val locs = HadoopRDD.NEW_HADOOP_CLASSES match {
+  case Some(c) =
+try {
+  val lsplit = c.inputSplitWithLocationInfo.cast(split)
+  val infos = c.getLocationInfo.
+  invoke(lsplit).asInstanceOf[Array[Object]]
+  Some(HadoopRDD.convertSplitLocationInfo(infos))
--- End diff --

Given how frequently this is invoked, instead of using reflection, it is 
better to have an interface and an specific impl to handle this case (where the 
new hadoop api's are inlined, and not accessed via reflection).
Ofcourse, initialization to use this new impl or None would be based on 
existance of the methods : and so decided via reflection - but that would be a 
one time operation (replacing NEW_HADOOP_CLASSES)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1767: Prefer HDFS-cached replicas when s...

2014-08-01 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1486#discussion_r15725610
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -216,6 +216,7 @@ abstract class RDD[T: ClassTag](
   getPreferredLocations(split)
 }
   }
+//out.map(PartitionLocation.fromString(_))
 
--- End diff --

please revert changes to this file


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

2014-08-01 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1722#discussion_r15725631
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -215,16 +218,28 @@ class ExternalAppendOnlyMap[K, V, C](
 
 if (objectsWritten == serializerBatchSize) {
   flush()
-  writer.close()
   writer = blockManager.getDiskWriter(blockId, file, serializer, 
fileBufferSize)
 }
   }
   if (objectsWritten  0) {
 flush()
+  } else if (writer != null) {
--- End diff --

when objectsWritten == 0, writer != null will hold.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

2014-08-01 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1722#discussion_r15725641
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -389,27 +404,51 @@ class ExternalAppendOnlyMap[K, V, C](
* An iterator that returns (K, C) pairs in sorted order from an on-disk 
map
*/
   private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: 
ArrayBuffer[Long])
-extends Iterator[(K, C)] {
-private val fileStream = new FileInputStream(file)
-private val bufferedStream = new BufferedInputStream(fileStream, 
fileBufferSize)
+extends Iterator[(K, C)]
+  {
--- End diff --

Those asserts caught the bugs :-) Bug yeah, some of them might have been 
expensive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

2014-08-01 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1722#discussion_r15725667
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -389,27 +404,51 @@ class ExternalAppendOnlyMap[K, V, C](
* An iterator that returns (K, C) pairs in sorted order from an on-disk 
map
*/
   private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: 
ArrayBuffer[Long])
-extends Iterator[(K, C)] {
-private val fileStream = new FileInputStream(file)
-private val bufferedStream = new BufferedInputStream(fileStream, 
fileBufferSize)
+extends Iterator[(K, C)]
+  {
+private val batchOffsets = batchSizes.scanLeft(0L)(_ + _)  // Size 
will be batchSize.length + 1
+assert(file.length() == batchOffsets(batchOffsets.length - 1))
+
+private var batchIndex = 0  // Which batch we're in
+private var fileStream: FileInputStream = null
 
 // An intermediate stream that reads from exactly one batch
 // This guards against pre-fetching and other arbitrary behavior of 
higher level streams
-private var batchStream = nextBatchStream()
-private var compressedStream = 
blockManager.wrapForCompression(blockId, batchStream)
-private var deserializeStream = ser.deserializeStream(compressedStream)
+private var deserializeStream = nextBatchStream()
 private var nextItem: (K, C) = null
 private var objectsRead = 0
 
 /**
  * Construct a stream that reads only from the next batch.
  */
-private def nextBatchStream(): InputStream = {
-  if (batchSizes.length  0) {
-ByteStreams.limit(bufferedStream, batchSizes.remove(0))
+private def nextBatchStream(): DeserializationStream = {
+  // Note that batchOffsets.length = numBatches + 1 since we did a 
scan above; check whether
+  // we're still in a valid batch.
+  if (batchIndex  batchOffsets.length - 1) {
+if (deserializeStream != null) {
+  deserializeStream.close()
+  fileStream.close()
+  deserializeStream = null
+  fileStream = null
+}
+
+val start = batchOffsets(batchIndex)
+fileStream = new FileInputStream(file)
+fileStream.getChannel.position(start)
+batchIndex += 1
+
+val end = batchOffsets(batchIndex)
+
+assert(end = start, start =  + start + , end =  + end +
+  , batchOffsets =  + batchOffsets.mkString([, , , ]))
+
+val bufferedStream = new 
BufferedInputStream(ByteStreams.limit(fileStream, end - start))
+val compressedStream = blockManager.wrapForCompression(blockId, 
bufferedStream)
+ser.deserializeStream(compressedStream)
--- End diff --

So that is something I was not sure of : particularly with kryo (not java).
We were seeing the input buffer getting stepped on from various threads - 
this was specifically in context of 2G fixes though, where we had to modify the 
way the buffer was created anyway. I dont know if the initialization changes 
something else.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

2014-08-01 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1722#discussion_r15725700
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -455,7 +495,25 @@ class ExternalAppendOnlyMap[K, V, C](
 
 // TODO: Ensure this gets called even if the iterator isn't drained.
 private def cleanup() {
-  deserializeStream.close()
+  batchIndex = batchOffsets.length  // Prevent reading any other batch
+  val ds = deserializeStream
+  val fs = fileStream
+  deserializeStream = null
+  fileStream = null
+
+  if (ds != null) {
+try {
+  ds.close()
+} catch {
+  case e: IOException =
+// Make sure we at least close the file handle
+if (fs != null) {
+  try { fs.close() } catch { case e2: IOException = }
--- End diff --

This is just paranoid stuff, you can remove the catch IOException part 
actually
The reason it exists is cos we have a Ulimit'ed FileInputStream - which 
enforces a ulimit of 8k on spark (which is the fd limit in our clusters).
For large reducers, this prevents task from getting killed.
So this is an attempt to ensure that the stream is truely closed.

I did not realize this had leaked out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

2014-08-01 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1722#discussion_r15725724
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
   private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
   private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: 
ArrayBuffer[Int]) = buf1 ++= buf2
 
+  private def createSparkConf(loadDefaults: Boolean): SparkConf = {
+val conf = new SparkConf(loadDefaults)
+// Make the Java serializer write a reset instruction (TC_RESET) after 
each object to test
+// for a bug we had with bytes written past the last object in a batch 
(SPARK-2792)
+conf.set(spark.serializer.objectStreamReset, 0)
+conf.set(spark.serializer, 
org.apache.spark.serializer.JavaSerializer)
+// Ensure that we actually have multiple batches per spill file
+conf.set(spark.shuffle.spill.batchSize, 10)
--- End diff --

Actually I should have made it 0 - to ensure reset after each object (with 
the corresponding java serializer change to enable it) is because we want to 
ensure that there is a TC_RESET after object write and before close. Else the 
bug is not exposed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

2014-08-01 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1722#discussion_r15725858
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
   private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
   private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: 
ArrayBuffer[Int]) = buf1 ++= buf2
 
+  private def createSparkConf(loadDefaults: Boolean): SparkConf = {
+val conf = new SparkConf(loadDefaults)
+// Make the Java serializer write a reset instruction (TC_RESET) after 
each object to test
+// for a bug we had with bytes written past the last object in a batch 
(SPARK-2792)
+conf.set(spark.serializer.objectStreamReset, 0)
+conf.set(spark.serializer, 
org.apache.spark.serializer.JavaSerializer)
+// Ensure that we actually have multiple batches per spill file
+conf.set(spark.shuffle.spill.batchSize, 10)
--- End diff --

Oops, I misread.
You are right, it looks fine (as long a batch size  20, it should be 
exhibited)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-08-01 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15725875
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
-  var totalExpectedExecutors = new AtomicInteger(0)
+  var totalExecutors = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-  // Submit tasks only after (registered executors / total expected 
executors) 
+  // Submit tasks only after (registered resources / total expected 
resources) 
   // is equal to at least this value, that is double between 0 and 1.
-  var minRegisteredRatio = 
conf.getDouble(spark.scheduler.minRegisteredExecutorsRatio, 0)
-  if (minRegisteredRatio  1) minRegisteredRatio = 1
-  // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after 
the time(milliseconds).
+  var minRegisteredRatio =
+math.min(1, 
conf.getDouble(spark.scheduler.minRegisteredResourcesRatio, 0))
+  // Submit tasks after maxRegisteredWaitingTime milliseconds
+  // if minRegisteredRatio has not yet been reached  
   val maxRegisteredWaitingTime =
-conf.getInt(spark.scheduler.maxRegisteredExecutorsWaitingTime, 3)
+conf.getInt(spark.scheduler.maxRegisteredResourcesWaitingTime, 3)
--- End diff --

interface change ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-08-01 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15725931
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
-  var totalExpectedExecutors = new AtomicInteger(0)
+  var totalExecutors = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-  // Submit tasks only after (registered executors / total expected 
executors) 
+  // Submit tasks only after (registered resources / total expected 
resources) 
   // is equal to at least this value, that is double between 0 and 1.
-  var minRegisteredRatio = 
conf.getDouble(spark.scheduler.minRegisteredExecutorsRatio, 0)
-  if (minRegisteredRatio  1) minRegisteredRatio = 1
-  // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after 
the time(milliseconds).
+  var minRegisteredRatio =
+math.min(1, 
conf.getDouble(spark.scheduler.minRegisteredResourcesRatio, 0))
+  // Submit tasks after maxRegisteredWaitingTime milliseconds
+  // if minRegisteredRatio has not yet been reached  
   val maxRegisteredWaitingTime =
-conf.getInt(spark.scheduler.maxRegisteredExecutorsWaitingTime, 3)
+conf.getInt(spark.scheduler.maxRegisteredResourcesWaitingTime, 3)
--- End diff --

I think this was introduced in 1.1 ? If yes, ok to rename


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

2014-08-02 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1722#discussion_r15728047
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
   private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
   private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: 
ArrayBuffer[Int]) = buf1 ++= buf2
 
+  private def createSparkConf(loadDefaults: Boolean): SparkConf = {
+val conf = new SparkConf(loadDefaults)
+// Make the Java serializer write a reset instruction (TC_RESET) after 
each object to test
+// for a bug we had with bytes written past the last object in a batch 
(SPARK-2792)
+conf.set(spark.serializer.objectStreamReset, 0)
+conf.set(spark.serializer, 
org.apache.spark.serializer.JavaSerializer)
+// Ensure that we actually have multiple batches per spill file
+conf.set(spark.shuffle.spill.batchSize, 10)
--- End diff --

BTW, was the java serializer fix also ported across ? I dont think 
@aarondav did that ...
Else the object reset = 0 will cause serializer to ignore it (in master it 
used to check for value  0)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Minor] Fixes on top of #1679

2014-08-02 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1736#discussion_r15728056
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala ---
@@ -46,9 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: 
BlockManager, sc: Spar
   metricRegistry.register(MetricRegistry.name(memory, memUsed_MB), new 
Gauge[Long] {
 override def getValue: Long = {
   val storageStatusList = blockManager.master.getStorageStatus
-  val maxMem = storageStatusList.map(_.maxMem).sum
-  val remainingMem = storageStatusList.map(_.memRemaining).sum
-  (maxMem - remainingMem) / 1024 / 1024
+  val memUsed = storageStatusList.map(_.memUsed).sum
+  memUsed / 1024 / 1024
--- End diff --

nit: / (1024 * 1024) instead ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Minor] Fixes on top of #1679

2014-08-02 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1736#discussion_r15732438
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala ---
@@ -46,9 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: 
BlockManager, sc: Spar
   metricRegistry.register(MetricRegistry.name(memory, memUsed_MB), new 
Gauge[Long] {
 override def getValue: Long = {
   val storageStatusList = blockManager.master.getStorageStatus
-  val maxMem = storageStatusList.map(_.maxMem).sum
-  val remainingMem = storageStatusList.map(_.memRemaining).sum
-  (maxMem - remainingMem) / 1024 / 1024
+  val memUsed = storageStatusList.map(_.memUsed).sum
+  memUsed / 1024 / 1024
--- End diff --

bad code is bad code :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Minor] Fixes on top of #1679

2014-08-02 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1736#discussion_r15732470
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala ---
@@ -46,9 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: 
BlockManager, sc: Spar
   metricRegistry.register(MetricRegistry.name(memory, memUsed_MB), new 
Gauge[Long] {
 override def getValue: Long = {
   val storageStatusList = blockManager.master.getStorageStatus
-  val maxMem = storageStatusList.map(_.maxMem).sum
-  val remainingMem = storageStatusList.map(_.memRemaining).sum
-  (maxMem - remainingMem) / 1024 / 1024
+  val memUsed = storageStatusList.map(_.memUsed).sum
+  memUsed / 1024 / 1024
--- End diff --

Btw, it is just a nit - so please dont let this block a commit !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

2014-08-03 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1722#issuecomment-50992153
  
LGTM, thanks Matei !
On 03-Aug-2014 12:13 pm, Matei Zaharia notificati...@github.com wrote:

 @aarondav https://github.com/aarondav / @mridulm
 https://github.com/mridulm any other comments on this, or is it okay to
 merge?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/1722#issuecomment-50983403.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

2014-08-03 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1722#issuecomment-50992283
  
Oh wait, is the java serialier change also ported ?
Else the tests won't do what we want it to do.
On 03-Aug-2014 8:11 pm, Mridul Muralidharan mri...@gmail.com wrote:

 LGTM, thanks Matei !
 On 03-Aug-2014 12:13 pm, Matei Zaharia notificati...@github.com wrote:

 @aarondav https://github.com/aarondav / @mridulm
 https://github.com/mridulm any other comments on this, or is it okay
 to merge?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/1722#issuecomment-50983403.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

2014-08-03 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1722#issuecomment-51003282
  
LGTM !
Though I would prefer if @aarondav also took a look at it - since this is 
based on my earlier work, I might be too close to it to see potential issues ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

2014-08-04 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1722#discussion_r15750212
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala ---
@@ -35,16 +35,15 @@ private[spark] class JavaSerializationStream(out: 
OutputStream, counterReset: In
   /**
* Calling reset to avoid memory leak:
* 
http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
-   * But only call it every 10,000th time to avoid bloated serialization 
streams (when
+   * But only call it every 100th time to avoid bloated serialization 
streams (when
* the stream 'resets' object class descriptions have to be re-written)
*/
   def writeObject[T: ClassTag](t: T): SerializationStream = {
 objOut.writeObject(t)
+counter += 1
 if (counterReset  0  counter = counterReset) {
--- End diff --

This is the right behavior, but is a slight change ... I dont think anyone 
is expecting the earlier  behavior though !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

2014-08-04 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1722#issuecomment-51047651
  
LGTM !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2856] Decrease initial buffer size for ...

2014-08-05 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1780#issuecomment-51168402
  
IIRC if kryo cant host entire serialized object in the buffer, it throws up 
: we saw issues with it being as high as 256 kb for some of our jobs : though 
we were using a different api (and it was slightly more complicated usecase).
Might be a good idea to verify this before changing default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2503] Lower shuffle output buffer (spar...

2014-08-05 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1781#issuecomment-51169641
  
We are running this with 8k or so :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Expose aplication ID in ApplicationStart event...

2014-08-05 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/1218#discussion_r15827428
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1531,18 +1532,6 @@ object SparkContext extends Logging {
 throw new SparkException(YARN mode not available ?, e)
   }
 }
-val backend = try {
-  val clazz =
-
Class.forName(org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend)
-  val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], 
classOf[SparkContext])
-  cons.newInstance(scheduler, 
sc).asInstanceOf[CoarseGrainedSchedulerBackend]
-} catch {
-  case e: Exception = {
-throw new SparkException(YARN mode not available ?, e)
-  }
-}
-scheduler.initialize(backend)
-scheduler
--- End diff --

The reflection code exists since the class can't be loaded in all env .
Tom, has this changed recently?
On 05-Aug-2014 10:27 pm, Marcelo Vanzin notificati...@github.com wrote:

 In core/src/main/scala/org/apache/spark/SparkContext.scala:

  @@ -1531,18 +1532,6 @@ object SparkContext extends Logging {
   throw new SparkException(YARN mode not available ?, e)
 }
   }
  -val backend = try {
  -  val clazz =
  -
Class.forName(org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend)
  -  val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], 
classOf[SparkContext])
  -  cons.newInstance(scheduler, 
sc).asInstanceOf[CoarseGrainedSchedulerBackend]
  -} catch {
  -  case e: Exception = {
  -throw new SparkException(YARN mode not available ?, e)
  -  }
  -}
  -scheduler.initialize(backend)
  -scheduler

 Maybe I'm missing something, but this is just removing a bunch of
 reflection code and replacing it with a single line later on (in
 YarnClusterScheduler):

 initialize(new YarnClusterSchedulerBackend(this, sc))

 This looks much, much easier to read and cleaner to me, but if you guys
 somehow feel so strongly about it, I can revert the change.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/1218/files#r15825749.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2856] Decrease initial buffer size for ...

2014-08-05 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1780#issuecomment-51269524
  
Hi @pwendell, my observation about buffer size was not in context of spark 
... we saw issues which looked like buffer overflow when the serialized 
object graph was large, and it was not handling the buffer growth properly.
Fortunately, this was due to a bug in our code to begin with (object being 
serialized was holding unrequired reference to a large graph of objects - 
running into an mb or so) : and so did not need to pursue it much.
But having seen something which should have been handled anyway, I want to 
make sure that changing the default does not cause surprises to our users.

If there are issues with buffer growth, and we lower the limit, a lot of 
jobs will start failing on release.

Given some of the past bugs we have fixed @pwendell (the flush issue comes 
to mind for example !), I am very wary of kryo - when it works, it is great, 
rest is suspicious :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Turn UpdateBlockInfo into case class.

2014-08-10 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1872#issuecomment-51710872
  
If case class then does it still need to be Externalizable ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2952] Enable logging actor messages at ...

2014-08-11 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1870#issuecomment-51850204
  
Just saw this as part of the close, sorry for the late comment.

Also, some of the INFO messages which are useful have now become DEBUG ? 
Makes it slightly harder to pinpoint an issue (ex: Stopping 
BlockManagerMaster).
Spark logs are already hitting 60gig for some of our jobs in DEBUG mode !

Btw, if we are logging all messages, would it not be better to use TRACE 
instead of DEBUG ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2931] In TaskSetManager, reset currentL...

2014-08-11 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1896#issuecomment-51850469
  
It is just a modification of the test above it :-)
Maybe some copy paste error ? That line is not required for this test btw - 
just the last line validates the issue.

We might want to add something to check that new value for the index is 
'sane' based on your change @JoshRosen ; which was not done in my test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2952] Enable logging actor messages at ...

2014-08-11 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1870#issuecomment-51850828
  
Unfortunately, in most cases, we wont know what the issue is other than bug 
hunting in the logs.
So debug logging gets enabled for a wide swathe of packages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2931] In TaskSetManager, reset currentL...

2014-08-11 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1896#issuecomment-51852483
  
It has to do with FakeRackUtil used in that class.
I guess not all tests clean it up properly after assigning to it : which is 
why host2 (or host1 depending on order) ends up being RACK_LOCAL.

In our test, we are not expecting it to be part of any rack.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2931] In TaskSetManager, reset currentL...

2014-08-11 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1896#issuecomment-51852593
  
To be deterministic, you can add FakeRackUtil.cleanup at begining of this 
test too.

Though ideally we should do it to tests which add hosts to rack (and not 
cleanup in tests which dont care about it) I guess. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3875] Add TEMP DIRECTORY configuration

2014-10-09 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/2729#issuecomment-58473957
  
At least for yarn, this will create issues if overridden from default.
Not sure about mesos.

Why not use std java property and define it for local and standalone mode 
where relevant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3875] Add TEMP DIRECTORY configuration

2014-10-09 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/2729#issuecomment-58479810
  
There is a java property which controls this ... java.io.tmpdir
On 09-Oct-2014 1:22 pm, 刘钰帆 notificati...@github.com wrote:

 @mridulm https://github.com/mridulm Using std java property is fine.
 Just add a more specific configuration argument.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/2729#issuecomment-58474600.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3889] Attempt to avoid SIGBUS by not mm...

2014-10-10 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/2742#issuecomment-58724312
  
This needs to be configurable ... IIRC 1.1 had this customizable.
Different limits exist for vm vs heap memory in yarn (for example).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3889] Attempt to avoid SIGBUS by not mm...

2014-10-10 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/2742#issuecomment-58728241
  
With 1.1, in expts, we have done both : depending on whether our user code
is mmap'ing too much data (and so we pull things into heap .. using
libraries not in our control :-) ); decreasing it when heap is at premium.
On 11-Oct-2014 4:42 am, Aaron Davidson notificati...@github.com wrote:

 @mridulm https://github.com/mridulm Could you give an example of which
 way you would want to shift it via config? Map more or less often?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/2742#issuecomment-58727337.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3889] Attempt to avoid SIGBUS by not mm...

2014-10-10 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/2742#issuecomment-58728319
  
Note: this is reqd since there are heap and vm limits enforced, so we
juggle available memory around so that jobs can run to completion!
On 11-Oct-2014 4:56 am, Mridul Muralidharan mri...@gmail.com wrote:

 With 1.1, in expts, we have done both : depending on whether our user code
 is mmap'ing too much data (and so we pull things into heap .. using
 libraries not in our control :-) ); decreasing it when heap is at premium.
 On 11-Oct-2014 4:42 am, Aaron Davidson notificati...@github.com wrote:

 @mridulm https://github.com/mridulm Could you give an example of which
 way you would want to shift it via config? Map more or less often?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/2742#issuecomment-58727337.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1937: fix issue with task locality

2014-06-10 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/892#discussion_r13596860
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -153,8 +153,8 @@ private[spark] class TaskSetManager(
   }
 
   // Figure out which locality levels we have in our TaskSet, so we can do 
delay scheduling
-  val myLocalityLevels = computeValidLocalityLevels()
-  val localityWaits = myLocalityLevels.map(getLocalityWait) // Time to 
wait at each level
+  var myLocalityLevels = computeValidLocalityLevels()
+  var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to 
wait at each level
--- End diff --

Just for clarity : this should end up with only ANY in the locality levels.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1937: fix issue with task locality

2014-06-10 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/892#discussion_r13597675
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -388,7 +386,7 @@ private[spark] class TaskSetManager(
   val curTime = clock.getTime()
 
   var allowedLocality = getAllowedLocalityLevel(curTime)
-  if (allowedLocality  maxLocality) {
+  if (allowedLocality  maxLocality  
myLocalityLevels.contains(maxLocality)) {
 allowedLocality = maxLocality   // We're not allowed to search for 
farther-away tasks
--- End diff --

This is incorrect.
Consider this :
maxLocality == ANY, but allowedLocality == PROCESS_LOCAL (since sufficient 
time has not elapsed for relaxing further) : and myLocalityLevels does not 
contain process local executors.

Instead of not allowing any task to be executed; now we will allow 
scheduling of tasks with ANY preference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1937: fix issue with task locality

2014-06-10 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/892#discussion_r13598180
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -388,7 +386,7 @@ private[spark] class TaskSetManager(
   val curTime = clock.getTime()
 
   var allowedLocality = getAllowedLocalityLevel(curTime)
-  if (allowedLocality  maxLocality) {
+  if (allowedLocality  maxLocality  
myLocalityLevels.contains(maxLocality)) {
 allowedLocality = maxLocality   // We're not allowed to search for 
farther-away tasks
   }
--- End diff --

Editing previous comment:
Suppose maxLocality == PROCESS_LOCAL while allowedLocality == RACK_LOCAL 
(by virture of having waited long enough for a free executor).

Instead of restricting schedule to upto PROCESS_LOCAL, this will now relax 
it all the way till RACK_LOCAL - which is incorrect (myLocalityLevels might not 
have PROCESS_LOCAL but could have NODE_LOCAL for example).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1937: fix issue with task locality

2014-06-10 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/892#discussion_r13601836
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -388,7 +386,7 @@ private[spark] class TaskSetManager(
   val curTime = clock.getTime()
 
   var allowedLocality = getAllowedLocalityLevel(curTime)
-  if (allowedLocality  maxLocality) {
+  if (allowedLocality  maxLocality  
myLocalityLevels.contains(maxLocality)) {
 allowedLocality = maxLocality   // We're not allowed to search for 
farther-away tasks
   }
--- End diff --

Back at desktop, so can elaborate better.
The issue is, with relaxed constraint for the executor, some task might get 
scheduled to it - which would have been better scheduled at some other executor.

Simple scenario extending my earlier example, suppose there is only one 
task t1 left and two executors become available.
Suppose for exec1 it is RACK_LOCAL while for exec2 it is NODE_LOCAL

We start with PROCESS_LOCAL as maxLocality - and suppose enough time had 
elapsed so allowedLocality == RACK_LOCAL or ANY.

In this case, if resourceOffer is called on exec1 first, we get RACK_LOCAL 
schedule
If resourceOffer was called on exec2 first, we get NODE_LOCAL schedule.

The reason for that if condition was exactly to prevent this. I am actually 
surprised I did not have any testcase to catch this ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1937: fix issue with task locality

2014-06-11 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/892#issuecomment-45732793
  
Just wanted to drop a quick node (since I might not be able to get to this 
until late next week).
I think the proposal should work : though I might be missing something, so 
will let others opine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

2014-06-11 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-45779752
  
This one slipped off my radar, my apologies.
@tgravescs In #892, if there is even a single executor which is process 
local with any partition, then we start waiting for all levels based on 
configured timeouts.
Here we are trying to ensure there are sufficient executors available 
before we start accepting jobs. The intent is slightly differrent



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

2014-06-11 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-45780405
  
Hit submit by mistake, to continue ...
The side effect of not having sufficient executors are different from #892. 
For example, 
a) the default parallelism in yarn is based on number of executors, 
b) the number of intermediate files per node for shuffle (this can bring 
the node down btw)
c) and amount of memory consumed on a node for rdd MEMORY persisted data 
(making the job fail if disk is not specified : like some of the mllib algos ?)
and so on ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Just a POC for having compression for every RD...

2014-06-15 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1091#issuecomment-46125063
  
Use spark.rdd.compress = true for compressing serialized RDD.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---



[GitHub] spark pull request: Compression should be a setting for individual...

2014-06-18 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/1091#issuecomment-46481279
  
Misread the PR and confused it with another pull request, ignore my earlier 
comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   5   6   7   8   9   10   >