[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-55472830 What happens when there is recomputation which results in same blockId getting regenerated (unpersist followed by recomputation/persist or block drop followed by recomputation or something else ) ? It will now go to some random node potentially not same as previously selected ? Resulting in over-replication ? A more corner case is if the computation was not idempotent ... and resulted in a changed dataset for the block - earlier it will get overwritten as part of replication : will we will now have two nodes with same data and a third (initially replicated to) which can diverge ? Btw, from what I saw, node loss is not handled right ? So a block can get under replicated ? Would be nice if we added that in some day ... Streaming is not the only application for replication :-) We use it in conjunction with locality wait levels to speed up computation when speculative execution is enabled. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56293724 @tdas handling (1) deterministically will make (2) in line with what we currently have. And that should be sufficient imo. (3) was not in context of this patch - but a general shortcoming of spark currently. Alleviating (3) might be complicated (not sure how much so) - but will have some very interesting consequences to performance (among others). For example: this prevents us from using block persistance for checkpoint - there was a discussion about this in a JIRA a while back (forgot id) ... resolving this and with 3x replicated blocks, will mean we get really cheap and very performent checkpoint (while having fault tolerance at par with hdfs) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17833363 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug(Fetched peers from master: + cachedPeers.mkString([, ,, ])) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt(spark.storage.maxReplicationFailures, 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None --- End diff -- This would mean that, assuming there is no change to number of executors in system, we will consistently get the same peer back. But even if some unrelated peer was added or removed, we wont get back the same peer we cached to last time. Did I read the correctly, or am I missing something ? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17833383 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug(Fetched peers from master: + cachedPeers.mkString([, ,, ])) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt(spark.storage.maxReplicationFailures, 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None } -for (peer: BlockManagerId - cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(sTry to replicate $blockId once; The size of the data is ${data.limit()} Bytes. + -sTo node: $peer) - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception = - logError(sFailed to replicate block to $peer, e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +while (!done) { + getRandomPeer() match { +case Some(peer) = + try { +val onePeerStartTime = System.nanoTime +data.rewind() +logTrace(sTrying to replicate $blockId of ${data.limit()} bytes to $peer) +blockTransferService.uploadBlockSync( + peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) +logTrace(sReplicated $blockId of ${data.limit()} bytes to $peer in %f ms + .format((System.nanoTime - onePeerStartTime) / 1e6)) +peersReplicatedTo += peer +forceFetchPeers = false +if (peersReplicatedTo.size == numPeersToReplicateTo) { + done = true +} + } catch { +case e: Exception = + logWarning(sFailed to replicate $blockId to $peer, failure #$failures, e) + failures += 1 + forceFetchPeers = true + peersFailedToReplicateTo += peer --- End diff -- Ideally, we might want to cache this peersFailedToReplicateTo across block updates for a short ttl (to temporarily blacklist replication to peer). But that can be done in a future PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17833419 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug(Fetched peers from master: + cachedPeers.mkString([, ,, ])) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt(spark.storage.maxReplicationFailures, 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None } -for (peer: BlockManagerId - cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(sTry to replicate $blockId once; The size of the data is ${data.limit()} Bytes. + -sTo node: $peer) - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception = - logError(sFailed to replicate block to $peer, e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +while (!done) { + getRandomPeer() match { +case Some(peer) = + try { +val onePeerStartTime = System.nanoTime +data.rewind() +logTrace(sTrying to replicate $blockId of ${data.limit()} bytes to $peer) +blockTransferService.uploadBlockSync( + peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) +logTrace(sReplicated $blockId of ${data.limit()} bytes to $peer in %f ms + .format((System.nanoTime - onePeerStartTime) / 1e6)) +peersReplicatedTo += peer +forceFetchPeers = false +if (peersReplicatedTo.size == numPeersToReplicateTo) { + done = true +} + } catch { +case e: Exception = + logWarning(sFailed to replicate $blockId to $peer, failure #$failures, e) + failures += 1 + forceFetchPeers = true + peersFailedToReplicateTo += peer --- End diff -- Btw, curious - will replication fail only when remote peer is dead ? (and so requiring forceFetchPeers) What about inability to add block in remote peer ? Will that cause an exception to be raised here ? Eseentially I am trying to understand if Exception raised here always means remote peer is 'dead'. Alternative might be to list peers which have atleast data.rewrind().remaining() space available : but we dont support that iirc (and it can get used up before we make this call anyway I guess). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17833483 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +789,88 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { +val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 1000) // milliseconds +val timeout = System.currentTimeMillis - lastPeerFetchTime cachedPeersTtl + +cachedPeers.synchronized { + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug(Fetched peers from master: + cachedPeers.mkString([, ,, ])) + } +} +cachedPeers + } + + /** * Replicate block to another node. */ - @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { +val maxReplicationFailures = conf.getInt(spark.storage.maxReplicationFailures, 1) +val numPeersToReplicateTo = level.replication - 1 +val peersReplicatedTo = new HashSet[BlockManagerId] +val peersFailedToReplicateTo = new HashSet[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) -if (cachedPeers == null) { - cachedPeers = master.getPeers(blockManagerId, level.replication - 1) +val startTime = System.nanoTime +val random = new Random(blockId.hashCode) + +var forceFetchPeers = false +var failures = 0 +var done = false + +// Get a random peer +def getRandomPeer(): Option[BlockManagerId] = { + val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo + if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None } -for (peer: BlockManagerId - cachedPeers) { - val start = System.nanoTime - data.rewind() - logDebug(sTry to replicate $blockId once; The size of the data is ${data.limit()} Bytes. + -sTo node: $peer) - try { -blockTransferService.uploadBlockSync( - peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) - } catch { -case e: Exception = - logError(sFailed to replicate block to $peer, e) +// One by one choose a random peer and try uploading the block to it +// If replication fails (e.g., target peer is down), force the list of cached peers +// to be re-fetched from driver and then pick another random peer for replication. Also +// temporarily black list the peer for which replication failed. +while (!done) { + getRandomPeer() match { +case Some(peer) = + try { +val onePeerStartTime = System.nanoTime +data.rewind() +logTrace(sTrying to replicate $blockId of ${data.limit()} bytes to $peer) +blockTransferService.uploadBlockSync( + peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel) +logTrace(sReplicated $blockId of ${data.limit()} bytes to $peer in %f ms + .format((System.nanoTime - onePeerStartTime) / 1e6)) +peersReplicatedTo += peer +forceFetchPeers = false +if (peersReplicatedTo.size == numPeersToReplicateTo) { + done = true +} + } catch { +case e: Exception = + logWarning(sFailed to replicate $blockId to $peer, failure #$failures, e) + failures += 1 + forceFetchPeers = true + peersFailedToReplicateTo += peer + if (failures maxReplicationFailures) { +done = true + } + } +case None = + // no peer left to replicate to + done = true --- End diff -- What if initial list had only self in executor list and we are within TTL (and so getPeers returns empty list) - bootstrapping time for example. Do we want to check if server has updates for us ? This will kind of hose our ttl though ... but maybe corner case. Or is this handled already ? Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled
[GitHub] spark pull request: SPARK-3561 - Pluggable strategy to facilitate ...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/2422#issuecomment-56373277 Is there an example of how this is going to be leveraged ? The default case is the simple version delegating to existing spark - would be good to see how this is used against tez - so that there is a clearer picture of how the design changes fit into overall changes proposed : and what further changes (if any) might be required in case we need to generalize the interface. As @pwendell mentioned, currently this aspect is not exposed - and if we want to expose it via an spi, we need to have better understanding of how it will be leveraged. Also, would Experimental make more sense than DeveloperApi (until it stabilizes) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1767: Prefer HDFS-cached replicas when s...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1486#issuecomment-56480392 Are we proposing to introduce hdfs caching tags/idioms directly into TaskSetManager in this pr ? That does not look right. We need to generalize this so that any rdd can specify process/host (maybe rack also ?) annotations. Once done, HadoopRdd can leverage that. Depending on underscore not being in name, etc is fragile. One option would be to define our uri's: with default reverting to host only. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1767: Prefer HDFS-cached replicas when s...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1486#issuecomment-56506066 @pwendell This is not hadoop RDD specific functionality - it is a general requirement which can be leveraged by any RDD in spark - and hadoop RDD currently happens to have a usecase for this when dfs caching is used. The fact that preferred location is currently a String might be the limitation here : and so extending it for uri or whatever else will add overhead (including current patch). For example: RDD which pulls data from tachyon or other distributed memory stores, loading data into accelerator cards and specifying process local locality for the block, etc are all uses of the same functionality imo. If not addressed properly, when the next similar requirement comes along - either we will be rewriting this code; or adding more surgical hacks along same lines. If the expectation is that spark wont need to support these other requirements [1], then we can definitely punt on doing a proper design change. Given this is not user facing change (right ?), we can definitely take current approach and replace it later; or do a more principled solution upfront. @kayousterhout @markhamstra @mateiz any thoughts given this modifies TaskSetManager for addition of this feature ? [1] which is unlikely given mllib's rapid pace of development - it is fairly inevitable to have the need to support accelerator cards sooner rather than later - atleast given the arc of our past efforts with ml on spark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/2366#discussion_r17927862 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -787,31 +790,111 @@ private[spark] class BlockManager( } /** + * Get peer block managers in the system. + */ + private def getPeers(forceFetch: Boolean): mutable.HashSet[BlockManagerId] = { +cachedPeers.synchronized { + val cachedPeersTtl = conf.getInt(spark.storage.cachedPeersTtl, 60 * 1000) // milliseconds + val timeout = System.currentTimeMillis - lastPeerFetchTime cachedPeersTtl + if (cachedPeers.isEmpty || forceFetch || timeout) { +cachedPeers.clear() +cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode) +lastPeerFetchTime = System.currentTimeMillis +logDebug(Fetched peers from master: + cachedPeers.mkString([, ,, ])) + } +} +cachedPeers + } --- End diff -- There is an MT bug here. Since cachedPeers is updated in place, it is possible for 'previous' invocation to be using cachedPeers while the next invocation is clearing/updating it. We can avoid that by overwriting cachedPeers instance variable with result of master.getPeers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3495] Block replication fails continuou...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/2366#issuecomment-56566367 @tdas In case I did not mention it before :-) this is definitely a great improvement over what existed earlier ! I would love it if we could (sometime soon I hope) add support for re-replication of blocks due to lost executors : which, currently, is outside scope of this PR it seems. Other than the MT bug I mentioned above, this looks good to me ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49804353 We saw a bunch of EOF Exceptions from SpillReader. java.io.EOFException at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2577) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1315) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:79) at org.apache.spark.util.collection.ExternalSorter$SpillReader.org$apache$spark$util$collection$ExternalSorter$SpillReader$$readNextItem(ExternalSorter.scala:526) at org.apache.spark.util.collection.ExternalSorter$SpillReader$$anon$6.hasNext(ExternalSorter.scala:560) at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.shuffle.sort.SortShuffleWriter$$anonfun$write$2.apply(SortShuffleWriter.scala:93) at org.apache.spark.shuffle.sort.SortShuffleWriter$$anonfun$write$2.apply(SortShuffleWriter.scala:89) (Exact line might not match master, but the issue might be conveyed I hope) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15259118 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15259190 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional partitioner; if given, sort by partition ID and then key + * @param ordering optional ordering to sort keys within each partition + * @param serializer serializer to use when spilling to disk + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf = SparkEnv.get.conf + private val fileBufferSize = conf.getInt(spark.shuffle.file.buffer.kb, 100) * 1024 + private val serializerBatchSize = conf.getLong(spark.shuffle.spill.batchSize, 1) + + private def getPartition(key: K): Int = { +if (shouldPartition) partitioner.get.getPartition(key) else 0 + } + + // Data structures to store in-memory objects before we spill. Depending on whether we have an + // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we + // store them in an array buffer. + var map = new SizeTrackingAppendOnlyMap[(Int, K), C] + var buffer = new SizeTrackingBuffer[((Int, K), C)] + + // Track how many elements we've read before we try to estimate memory. Ideally we'd use + // map.size or buffer.size for this, but because users' Aggregators can potentially increase + // the size of a merged element when we add values with the same key, it's safer to track + // elements read from the input iterator. + private var elementsRead = 0L + private val trackMemoryThreshold = 1000 + + // Spilling statistics + private var spillCount = 0 + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L + + // Collective memory threshold shared across all running tasks + private val maxMemoryThreshold = { +val memoryFraction = conf.getDouble(spark.shuffle.memoryFraction, 0.3) +val safetyFraction = conf.getDouble(spark.shuffle.safetyFraction, 0.8) +(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + } + + // A comparator for keys K that orders them within a partition to allow partial aggregation. + // Can be a partial ordering by hash code if a total ordering is not provided through
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49833579 I had pulled about 20 mins after I mailed you ... I have elaborated on why this occurs inline in the code - we can ignore it for now though, since it happens even in 'regular' case : we had fixed it so long ago, I had forgotten about it and assumed it was some other issue ! We can punt on resolving this for now, and address it when the consolidated shuffle pr comes out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15274240 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional Partitioner; if given, sort by partition ID and then key + * @param ordering optional Ordering to sort keys within each partition; should be a total ordering + * @param serializer serializer to use when spilling to disk + * + * Note that if an Ordering is given, we'll always sort using it, so only provide it if you really + * want the output keys to be sorted. In a map task without map-side combine for example, you + * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do + * want to do combining, having an Ordering is more efficient than not having it. + * + * At a high level, this class works as follows: + * + * - We repeatedly fill up buffers of in-memory data, using either a SizeTrackingAppendOnlyMap if + * we want to combine by key, or an simple SizeTrackingBuffer if we don't. Inside these buffers, + * we sort elements of type ((Int, K), C) where the Int is the partition ID. This is done to + * avoid calling the partitioner multiple times on the same key (e.g. for RangePartitioner). + * + * - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first + * by partition ID and possibly second by key or by hash code of the key, if we want to do + * aggregation. For each file, we track how many objects were in each partition in memory, so we + * don't have to write out the partition ID for every element. + * + * - When the user requests an iterator, the spilled files are merged, along with any remaining + * in-memory data, using the same sort order defined above (unless both sorting and aggregation + * are disabled). If we need to aggregate by key, we either use a total ordering from the + * ordering parameter, or read the keys with the same hash code and compare them with each other + * for equality to merge values. + * + * - Users are expected to call stop() at the end to delete all the intermediate files. + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf
[GitHub] spark pull request: SPARK-2634: Change MapOutputTrackerWorker.mapS...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1541#issuecomment-49855865 Instead of a ConcurrentHashMap, we should actually move it to a disk backed Map - the cleanup of this datastructure is painful - which it can become extremely large; particularly for iterative algo's. Fortunately, most cases, we just need the last few entries - and so LRU scheme by most disk backed map's work beautifully. We have been using mapdb for this in MapOutputTrackerWorker - and it has worked beautifully. @rxin might be particularly interested since he is looking into reduce memory footprint of spark CC @mateiz - this is what I had mentioned about earlier. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15288486 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io._ +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +import com.google.common.io.ByteStreams + +import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId + +/** + * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner + * pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then + * optionally sorts keys within each partition using a custom Comparator. Can output a single + * partitioned file with a different byte range for each partition, suitable for shuffle fetches. + * + * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. + * + * @param aggregator optional Aggregator with combine functions to use for merging data + * @param partitioner optional Partitioner; if given, sort by partition ID and then key + * @param ordering optional Ordering to sort keys within each partition; should be a total ordering + * @param serializer serializer to use when spilling to disk + * + * Note that if an Ordering is given, we'll always sort using it, so only provide it if you really + * want the output keys to be sorted. In a map task without map-side combine for example, you + * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do + * want to do combining, having an Ordering is more efficient than not having it. + * + * At a high level, this class works as follows: + * + * - We repeatedly fill up buffers of in-memory data, using either a SizeTrackingAppendOnlyMap if + * we want to combine by key, or an simple SizeTrackingBuffer if we don't. Inside these buffers, + * we sort elements of type ((Int, K), C) where the Int is the partition ID. This is done to + * avoid calling the partitioner multiple times on the same key (e.g. for RangePartitioner). + * + * - When each buffer reaches our memory limit, we spill it to a file. This file is sorted first + * by partition ID and possibly second by key or by hash code of the key, if we want to do + * aggregation. For each file, we track how many objects were in each partition in memory, so we + * don't have to write out the partition ID for every element. + * + * - When the user requests an iterator, the spilled files are merged, along with any remaining + * in-memory data, using the same sort order defined above (unless both sorting and aggregation + * are disabled). If we need to aggregate by key, we either use a total ordering from the + * ordering parameter, or read the keys with the same hash code and compare them with each other + * for equality to merge values. + * + * - Users are expected to call stop() at the end to delete all the intermediate files. + */ +private[spark] class ExternalSorter[K, V, C]( +aggregator: Option[Aggregator[K, V, C]] = None, +partitioner: Option[Partitioner] = None, +ordering: Option[Ordering[K]] = None, +serializer: Option[Serializer] = None) extends Logging { + + private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) + private val shouldPartition = numPartitions 1 + + private val blockManager = SparkEnv.get.blockManager + private val diskBlockManager = blockManager.diskBlockManager + private val ser = Serializer.getSerializer(serializer) + private val serInstance = ser.newInstance() + + private val conf
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49949511 @mateiz The total memory overhead actually goes much higher than num_streams right ? It should be order of num_streams + num_values for this key. For fairly large values, the latter might fit into memory, but the former might not (particularly as number of mappers increases). Or did I get this wrong from the PR ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-50115353 Running tests with export SPARK_JAVA_OPTS=-Dspark.shuffle.manager=org.apache.spark.shuffle.sort.SortShuffleManager causes : ''' - sorting using mutable pairs *** FAILED *** org.apache.spark.SparkException: Job aborted due to stage failure: Task 3.0:1 failed 4 times, most recent failure: Exception failure in TID 14 on host localhost: java.lang.ArrayStoreException: scala.Tuple2 scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88) scala.Array$.slowcopy(Array.scala:81) scala.Array$.copy(Array.scala:107) scala.collection.mutable.ResizableArray$class.copyToArray(ResizableArray.scala:77) scala.collection.mutable.ArrayBuffer.copyToArray(ArrayBuffer.scala:47) scala.collection.TraversableOnce$class.copyToArray(TraversableOnce.scala:241) scala.collection.AbstractTraversable.copyToArray(Traversable.scala:105) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:249) scala.collection.AbstractTraversable.toArray(Traversable.scala:105) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62) org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:61) org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:581) org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:581) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1055) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1039) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1037) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1037) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:641) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:641) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:641) ... ''' The actual classes are : ''' classes = [class scala.Tuple2] ; [class scala.Tuple2, class scala.Tuple2, class scala.Tuple2] ''' obtained using : ''' def collect(): Array[T] = { println(classes = + sc.runJob(this, (iter: Iterator[T]) = iter.map(v = if (v == null) null else v.getClass).mkString([, ,\n\t\t, ])). mkString(, ; \n\t, )) System.out.flush() val results = sc.runJob(this, (iter: Iterator[T]) = iter.toArray) ''' in RDD.collect --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-50115453 BTW, this is one of 5 failures from core. I hope there are no merge issues though, --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-50116492 ah, thanks ! rerunning with 9c29957. cant pull the pr - and manual merge is painful, hence delays in testing :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2671] BlockObjectWriter should create p...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1580#issuecomment-50246267 Actually we have also seen this happen multiple times. A few have them have been fixed, but not all have been identified. For example, there is incorrect DCL for directory creation in spark. The tricky bit is preventing creation of directories when shutdown is happening (either via exit or via driver message). The consolidated shuffle bug fix includes some attempts to resolve it : but since we could not identify/nail down all cases. We introduced something similar : retry in case there is a FileNotFoundException creating file stream but prevent in case we were going to shutdown (so that we dont mess up shutdown hooks trying to remove directories !). Regards, Mridul On Fri, Jul 25, 2014 at 11:41 PM, Matei Zaharia notificati...@github.com wrote: But the point above was that the code that creates this object goes through DiskBlockManager.getFile, which already creates any non-existent directories. So I don't think this will be a problem, unless a directory is deleted exactly in the instant when we return a File and we start writing. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/1580#issuecomment-50184666. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2294: fix locality inversion bug in Task...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1313#issuecomment-50257258 Since all process local tasks are also node, rack and any : we will incur node local delay also. On 27-Jul-2014 11:09 am, Matei Zaharia notificati...@github.com wrote: I thought that we can skip locality levels in the waiting phase if we have no tasks for them -- that's why we have computeValidLocalityLevels. In the case you mentioned, the valid levels would only be PROCESS_LOCAL and NO_PREFS. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/1313#issuecomment-50256511. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
GitHub user mridulm opened a pull request: https://github.com/apache/spark/pull/1609 [SPARK-2532] WIP Consolidated shuffle fixes Status of the PR - [X] Cherry pick and merge changes from internal branch to spark master - [X] Remove WIP comments and 2G branch references. - [X] Tests for BlockObjectWriter - [ ] Tests for ExternalAppendOnlyMap - [ ] Tests for MapOutputTracker - [ ] Tests for ShuffleBlockManager You can merge this pull request into a Git repository by running: $ git pull https://github.com/mridulm/spark consolidated_shuffle_fixes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1609.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1609 commit f1182f8a3d3328248d471038d6ab0db6e6a1396d Author: Mridul Muralidharan mridul...@apache.org Date: 2014-07-27T15:23:05Z Consolidated shuffle fixes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15442779 --- Diff: core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala --- @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.scalatest.FunSuite +import java.io.{IOException, FileOutputStream, OutputStream, File} +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** + * Test various code paths in DiskBlockObjectWriter + */ +class DiskBlockObjectWriterSuite extends FunSuite { + + private val conf = new SparkConf + private val BUFFER_SIZE = 32 * 1024 + + private def tempFile(): File = { +val file = File.createTempFile(temp_, block) +// We dont want file to exist ! Just need a temp file name +file.delete() +file + } + + private def createWriter(file: File = tempFile()) : + (File, DiskBlockObjectWriter) = { +file.deleteOnExit() + +(file, new DiskBlockObjectWriter(BlockId(test_1), file, + new JavaSerializer(conf), BUFFER_SIZE, (out: OutputStream) = out, true)) + } + + + test(write after close should throw IOException) { +val (file, bow) = createWriter() +bow.write(test) +bow.write(test1) +assert (file.exists() file.isFile) + +bow.commitAndClose() + +intercept[IOException] { + bow.write(test2) +} + +file.delete() + } + + test(write after revert should throw IOException) { +val (file, bow) = createWriter() +bow.write(test) +bow.write(test1) +assert (file.exists() file.isFile) + +bow.revertPartialWritesAndClose() + +intercept[IOException] { + bow.write(test2) +} + +file.delete() + } + + test(create even if directory does not exist) { +val dir = File.createTempFile(temp_, dir) +dir.delete() + +val file = new File(dir, temp.file) +file.deleteOnExit() + +val bow = new DiskBlockObjectWriter(BlockId(test_1), file, new JavaSerializer(conf), + BUFFER_SIZE, (out: OutputStream) = out, true) + +bow.write(test) +assert (file.exists() file.isFile) +bow.commitAndClose() +Utils.deleteRecursively(dir) + } + + test(revert of new file should delete it) { +val (file, bow) = createWriter() +bow.write(test) +bow.write(test1) +assert (file.exists() file.isFile) + +bow.revertPartialWritesAndClose() +assert (! file.exists()) +// file.delete() + } + + test(revert of existing file should revert it to previous state) { +val (file, bow1) = createWriter() + +bow1.write(test) +bow1.write(test1) +assert (file.exists() file.isFile) + +bow1.commitAndClose() +val length = file.length() + +// reopen same file. +val bow2 = createWriter(file)._2 + +bow2.write(test3) +bow2.write(test4) + +assert (file.exists() file.isFile) + +bow2.revertPartialWritesAndClose() +assert (file.exists()) +assert (length == file.length()) +file.delete() + } + + test(revert of writer after close should delete if it did not exist earlier) { +val (file, bow) = createWriter(tempFile()) + +bow.write(test) +bow.write(test1) +assert (file.exists() file.isFile) + +bow.commitAndClose() +val length = file.length() + +assert (file.exists() file.isFile) +assert (length 0) + +// Now revert the file, after it has been closed : should delete the file +// since it did not exist earlier. +bow.revertPartialWritesAndClose() +assert (! file.exists
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15448803 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -935,15 +941,22 @@ private[spark] object Utils extends Logging { * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing * an IllegalStateException. */ + @volatile private var shutdownStarted = false + private[spark] def setShutdownStarted() { +shutdownStarted = true + } def inShutdown(): Boolean = { try { + if (shutdownStarted) return true --- End diff -- Unfortunately, had to be reverted :-( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50306648 Accidental close, apologies ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mridulm closed the pull request at: https://github.com/apache/spark/pull/1609 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50306633 @witgo I did not understand the space issue : stylecheck seems to run fine. Regarding the actual issues : the JIRA lists some of them - unfortunately it is not exhaustive. Spark code assumes a few things : 1) A flush followed by close should not cause additional data to be written to the stream - which is not valid in general case (close can still write more data). 2) reading an object from stream will consume all data written as part of the object - which is not valid in general case, additional info could have been written after the object was written (like reset markers in java serde). So stream wrapping has to account for that. 3) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
GitHub user mridulm reopened a pull request: https://github.com/apache/spark/pull/1609 [SPARK-2532] WIP Consolidated shuffle fixes Status of the PR - [X] Cherry pick and merge changes from internal branch to spark master - [X] Remove WIP comments and 2G branch references. - [X] Tests for BlockObjectWriter - [ ] Tests for ExternalAppendOnlyMap - [x] Tests for ShuffleBlockManager You can merge this pull request into a Git repository by running: $ git pull https://github.com/mridulm/spark consolidated_shuffle_fixes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1609.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1609 commit f1182f8a3d3328248d471038d6ab0db6e6a1396d Author: Mridul Muralidharan mridul...@apache.org Date: 2014-07-27T15:23:05Z Consolidated shuffle fixes commit 66d6ec3f99882ad6062c5bff36f2edb82b0c24c0 Author: Mridul Muralidharan mridul...@apache.org Date: 2014-07-27T15:40:32Z Add missing setShutdownStarted hooks commit 027c7f18c44c57960a2a94eee961f0aa811e7a34 Author: Mridul Muralidharan mridul...@apache.org Date: 2014-07-27T16:05:31Z stylecheck fixes commit 195c529c1ae5ffa7e8f9cf6af4df8b9536a39d6a Author: Mridul Muralidharan mridul...@apache.org Date: 2014-07-27T23:46:53Z Fix build, add testcases for DiskBlockManagerSuite commit 6095545bf55a87ef0b28bc11adc63dcc5b661b6c Author: Mridul Muralidharan mridul...@apache.org Date: 2014-07-27T23:50:45Z Consolidated fixes commit 1c1faea69d9709c7e65afc9bdd13a8e0d5488c82 Author: Mridul Muralidharan mridul...@apache.org Date: 2014-07-27T23:50:50Z Merge branch 'consolidated_shuffle_fixes' of github.com:mridulm/spark into consolidated_shuffle_fixes commit fbf20f792baf7ab8d6705c7a9525c2db92bb7ae3 Author: Mridul Muralidharan mridul...@apache.org Date: 2014-07-28T06:59:45Z Disable code to detect programming shutdown via stop's. Make actor/store shutdown in DiskBlockManagerSuite more robust --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50307155 Jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15449433 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -947,6 +958,34 @@ private[spark] object Utils extends Logging { } false } + /* + @volatile private var shutdownStarted = false + private[spark] def setShutdownStarted() { +shutdownStarted = true + } + + def inShutdown(): Boolean = { +if (shutdownStarted) return true +doShutdownCheck() +shutdownStarted + } + + private[spark] def doShutdownCheck() { +var shutdown = false +try { + val hook = new Thread { +override def run() {} + } + Runtime.getRuntime.addShutdownHook(hook) + Runtime.getRuntime.removeShutdownHook(hook) +} catch { + case ise: IllegalStateException = +shutdown = true +} finally { + shutdownStarted = shutdown +} + } + */ --- End diff -- This (and related commented inShutdown references) has been left around in case someone else can suggest an improvement ! The issue is, this works fine in 'normal' use : but while running spark tests in local mode, it fails. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50455483 All pending fixes work be done. I dont think there are any pieces missing in the merge from internal branch to master. Open for review, thanks ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1542#issuecomment-50488319 @pwendell @mateiz was this PR really merged into spark ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15537366 --- Diff: core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala --- @@ -40,7 +40,7 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In */ def writeObject[T: ClassTag](t: T): SerializationStream = { objOut.writeObject(t) -if (counterReset 0 counter = counterReset) { +if (counterReset = 0 counter = counterReset) { --- End diff -- This was done only to support adding marker after each object has been written. Only practical reason to do this is to test that part. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1542#issuecomment-50509704 That was super scarey ! Thanks for clarifying @aarondav --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15540734 --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala --- @@ -116,8 +118,13 @@ class HashShuffleWriter[K, V]( private def revertWrites(): Unit = { if (shuffle != null shuffle.writers != null) { for (writer - shuffle.writers) { -writer.revertPartialWrites() -writer.close() +try { + writer.revertPartialWritesAndClose() +} catch { + // Ensure that all revert's get done - log exception and continue + case ex: Exception = +logError(Exception reverting/closing writers, ex) +} } --- End diff -- revert/close can throw exception - causing other writers to be left hanging. Hence, log and continue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15540782 --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala --- @@ -71,7 +72,8 @@ class HashShuffleWriter[K, V]( try { return Some(commitWritesAndBuildStatus()) } catch { - case e: Exception = + case e: Throwable = +success = false // for finally block revertWrites() --- End diff -- If success != false, then release writers will attempt to register. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15540934 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException(Already closed) + +assert (! initialized) +assert (! wasOpenedOnce) +var exists = false +try { + exists = file.exists() + if (! exists initiallyExists 0 != initialPosition ! Utils.inShutdown) { +// Was deleted by cleanup thread ? +throw new IOException(file + file + cleaned up ? exists = + exists + + , initiallyExists = + initiallyExists + , initialPosition = + initialPosition) + } + fos = new FileOutputStream(file, true) +} catch { + case fEx: FileNotFoundException = +// There seems to be some race in directory creation. +// Attempts to fix it dont seem to have worked : working around the problem for now. +logDebug(Unable to open + file + , canRetry = + canRetry + , exists = + exists + + , initialPosition = + initialPosition + , in shutdown = + Utils.inShutdown(), fEx) +if (canRetry ! Utils.inShutdown()) { + // try creating the parent directory if that is the issue. + // Since there can be race with others, dont bother checking for + // success/failure - the call to init() will resolve if fos can be created. + file.getParentFile.mkdirs() + // Note, if directory did not exist, then file does not either - and so + // initialPosition would be zero in either case. + init(canRetry = false) --- End diff -- As mentioned in the comments, this tries to retry once in case file could not be created due to lack of presence of directory (which is what the FileNotFoundException is usually for) : except when we are already in shutdown. This is a case which happens due to some race in spark between creation of directory and allowing files to be created under that directory. We have fixed a double checked locking bug below (in DiskBlockManager) but looks like it is not sufficient - since this was observed even after that. (In our branch, the logDebug is actually logError to flush out these cases). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15541003 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException(Already closed) + +assert (! initialized) +assert (! wasOpenedOnce) +var exists = false +try { + exists = file.exists() + if (! exists initiallyExists 0 != initialPosition ! Utils.inShutdown) { +// Was deleted by cleanup thread ? +throw new IOException(file + file + cleaned up ? exists = + exists + + , initiallyExists = + initiallyExists + , initialPosition = + initialPosition) + } + fos = new FileOutputStream(file, true) +} catch { + case fEx: FileNotFoundException = +// There seems to be some race in directory creation. +// Attempts to fix it dont seem to have worked : working around the problem for now. +logDebug(Unable to open + file + , canRetry = + canRetry + , exists = + exists + + , initialPosition = + initialPosition + , in shutdown = + Utils.inShutdown(), fEx) +if (canRetry ! Utils.inShutdown()) { + // try creating the parent directory if that is the issue. + // Since there can be race with others, dont bother checking for + // success/failure - the call to init() will resolve if fos can be created. + file.getParentFile.mkdirs() + // Note, if directory did not exist, then file does not either - and so + // initialPosition would be zero in either case. + init(canRetry = false) + return +} else throw fEx +} + +try { + // This is to workaround case where creation of object and actual init + // (which can happen much later) happens after a delay and the cleanup thread + // cleaned up the file. + channel = fos.getChannel + val fosPos = channel.position() + if (initialPosition != fosPos) { +throw new IOException(file cleaned up ? + file.exists() + + , initialpos = + initialPosition + + current len = + fosPos + , in shutdown ? + Utils.inShutdown) + } + + ts = new TimeTrackingOutputStream(fos) + val bos = new BufferedOutputStream(ts, bufferSize) + bs = compressStream(bos) + objOut = serializer.newInstance().serializeStream(bs) + initialized = true + wasOpenedOnce = true; +} finally { + if (! initialized) { +// failed, cleanup state. +val tfos = fos +updateCloseState() +tfos.close() + } +} + } + + private def open(): BlockObjectWriter = { +init() lastValidPosition = initialPosition -bs = compressStream(new BufferedOutputStream(ts, bufferSize)) -objOut = serializer.newInstance().serializeStream(bs) -initialized = true this } - override def close() { -if (initialized) { - if (syncWrites
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15541065 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -188,6 +425,39 @@ private[spark] class DiskBlockObjectWriter( // Only valid if called after commit() override def bytesWritten: Long = { -lastValidPosition - initialPosition +val retval = lastValidPosition - initialPosition + +assert(retval = 0 || Utils.inShutdown(), + exists = + file.exists() + , bytesWritten = + retval + + , lastValidPosition = + lastValidPosition + , initialPosition = + initialPosition + + , in shutdown = + Utils.inShutdown()) + +// TODO: Comment this out when we are done validating : can be expensive due to file.length() +assert (file.length() = lastValidPosition || Utils.inShutdown(), + exists = + file.exists() + , file len = + file.length() + + , bytesWritten = + retval + , lastValidPosition = + lastValidPosition + + , initialPosition = + initialPosition + , in shutdown = + Utils.inShutdown()) + +if (retval = 0) retval else 0 } } + +object DiskBlockObjectWriter{ + + // Unfortunately, cant do it atomically ... + private def truncateIfExists(file: File, truncatePos: Long) { +var fos: FileOutputStream = null +try { + // There is no way to do this atomically iirc. + if (file.exists() file.length() != truncatePos) { +fos = new FileOutputStream(file, true) +fos.getChannel.truncate(truncatePos) + } +} finally { + if (fos ne null) { +fos.close() + } +} + } +} --- End diff -- Hopefully, rest of the code changes in this class is documented (a bit too heavily probably ? lighten it before final commit ?) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15541257 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala --- @@ -236,31 +241,61 @@ object ShuffleBlockManager { new PrimitiveVector[Long]() } -def numBlocks = mapIdToIndex.size +/* + * This is required for shuffle consolidation to work. In particular when updates to file are + * happening while parallel requests to fetch block happens. + */ +private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) { + new PrimitiveVector[Long]() +} + +private var numBlocks = 0 --- End diff -- The reason for change to var from def is perhaps subtle. Consider the case of : add for mapIdToIndex with mapId 0 add for mapIdToIndex with mapId 1 add for mapIdToIndex with mapId 0 (on re-execution) add for mapIdToIndex with mapId 1 (on re-execution) Now both 0 and 1 will end up with the same index assigned (since it was based on mapIdToIndex.size). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15541435 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -353,26 +368,53 @@ class ExternalAppendOnlyMap[K, V, C]( */ private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long]) extends Iterator[(K, C)] { -private val fileStream = new FileInputStream(file) -private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) + +assert (! batchSizes.isEmpty) +assert (! batchSizes.exists(_ = 0)) +private val batchOffsets = batchSizes.scanLeft(0L)(_ + _) --- End diff -- This is to give us the starting offset for each batch; and not rely on where the last batch's read ended. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15542308 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -418,7 +459,25 @@ class ExternalAppendOnlyMap[K, V, C]( // TODO: Ensure this gets called even if the iterator isn't drained. private def cleanup() { - deserializeStream.close() + batchIndex = batchOffsets.length + val dstrm = deserializeStream + val fstrm = fileStream + deserializeStream = null + fileStream = null + + if (dstrm ne null) { +try { + dstrm.close() +} catch { + case ioEx: IOException = { +// best case attempt - atleast free the handles +if (fstrm ne null) { + try { fstrm.close() } catch {case ioEx: IOException = } +} +throw ioEx + } +} + } --- End diff -- This is just more defensive cleanup compared to earlier and setting batchIndex to unusable value. To ensure that any close related exceptions do not result in unclosed files (which impacts ulimit of spark process) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1609#issuecomment-50517754 I have added some comments to the PR in the hopes that it will aid in the review. I am sure it is still involved process inspite of this, so please do feel free to raise as many queries as required : sometimes they might trigger unearthing some other issues as part of the discussion. I want to ensure that we do not miss on any subtle issue here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2045 Sort-based shuffle
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-50522967 @mateiz please refer to changes here : https://github.com/apache/spark/pull/1609/files#diff-10 They should be relevant to this PR too --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15565447 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. --- End diff -- As I mentioned above, we did fix a dcl bug, bug that did not seem sufficient. I agree this is a rare condition, and the 'fix' is a hack to workaround the problem : but pending identifying root cause, this is the best we have unfortunately. Any thoughts ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15565486 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null + + // Did we create this file or was it already present : used in revert to decide + // if we should delete this file or not. Also used to detect if file was deleted + // between creation of BOW and its actual init + private val initiallyExists = file.exists() file.isFile private val initialPosition = file.length() private var lastValidPosition = initialPosition + private var initialized = false + // closed explicitly ? + private var closed = false + // Attempt to cleanly close ? (could also be closed via revert) + // Note, a cleanly closed file could be subsequently reverted + private var cleanCloseAttempted = false + // Was the file actually opened atleast once. + // Note: initialized/streams change state with close/revert. + private var wasOpenedOnce = false private var _timeWriting = 0L - override def open(): BlockObjectWriter = { -fos = new FileOutputStream(file, true) -ts = new TimeTrackingOutputStream(fos) -channel = fos.getChannel() + // Due to some directory creation race issues in spark, it has been observed that + // sometimes file creation happens 'before' the actual directory has been created + // So we attempt to retry atleast once with a mkdirs in case directory was missing. + private def init() { +init(canRetry = true) + } + + private def init(canRetry: Boolean) { + +if (closed) throw new IOException(Already closed) + +assert (! initialized) +assert (! wasOpenedOnce) +var exists = false +try { + exists = file.exists() + if (! exists initiallyExists 0 != initialPosition ! Utils.inShutdown) { +// Was deleted by cleanup thread ? +throw new IOException(file + file + cleaned up ? exists = + exists + + , initiallyExists = + initiallyExists + , initialPosition = + initialPosition) + } + fos = new FileOutputStream(file, true) +} catch { + case fEx: FileNotFoundException = +// There seems to be some race in directory creation. +// Attempts to fix it dont seem to have worked : working around the problem for now. +logDebug(Unable to open + file + , canRetry = + canRetry + , exists = + exists + + , initialPosition = + initialPosition + , in shutdown = + Utils.inShutdown(), fEx) +if (canRetry ! Utils.inShutdown()) { + // try creating the parent directory if that is the issue. + // Since there can be race with others, dont bother checking for + // success/failure - the call to init() will resolve if fos can be created. + file.getParentFile.mkdirs() + // Note, if directory did not exist, then file does not either - and so + // initialPosition would be zero in either case. + init(canRetry = false) + return +} else throw fEx +} + +try { + // This is to workaround case where creation of object and actual init + // (which can happen much later) happens after a delay and the cleanup thread + // cleaned up the file. + channel = fos.getChannel + val fosPos = channel.position() + if (initialPosition != fosPos) { +throw new IOException(file cleaned up ? + file.exists() + + , initialpos = + initialPosition + + current len = + fosPos + , in shutdown ? + Utils.inShutdown) + } + + ts = new TimeTrackingOutputStream(fos) + val bos = new BufferedOutputStream(ts, bufferSize) + bs = compressStream(bos) + objOut = serializer.newInstance().serializeStream(bs) + initialized = true + wasOpenedOnce = true; +} finally { + if (! initialized) { +// failed, cleanup state. +val tfos = fos +updateCloseState() +tfos.close() + } +} + } + + private def open(): BlockObjectWriter = { +init() lastValidPosition = initialPosition -bs = compressStream(new BufferedOutputStream(ts, bufferSize)) -objOut = serializer.newInstance().serializeStream(bs) -initialized = true this } - override def close() { -if (initialized) { - if (syncWrites
[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1609#discussion_r15565552 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -947,6 +958,34 @@ private[spark] object Utils extends Logging { } false } + /* + @volatile private var shutdownStarted = false + private[spark] def setShutdownStarted() { +shutdownStarted = true + } + + def inShutdown(): Boolean = { +if (shutdownStarted) return true +doShutdownCheck() +shutdownStarted + } + + private[spark] def doShutdownCheck() { +var shutdown = false +try { + val hook = new Thread { +override def run() {} + } + Runtime.getRuntime.addShutdownHook(hook) + Runtime.getRuntime.removeShutdownHook(hook) +} catch { + case ise: IllegalStateException = +shutdown = true +} finally { + shutdownStarted = shutdown +} + } + */ --- End diff -- Sure, will move it out when I push next. It becomes directly relevant to this patch since there are assertions which check for either file/directory being in expected state or VM is in shutdown (and so cleanup happened/is happening - which caused file deletions). For VM shutdown, this is just an optimization - but for shutdown ordered by driver, inShutdown will return false, but same codepath as shutdown is invoked by spark (stop on various subsystems) : resulting in exceptions/assertion failures in threads which are still running. Unfortunately, this diff interacts badly with local mode - particularly tests, since it keeps reusing the same VM. Any ideas on how to 'fix' or resolve this ? Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2532: Minimal shuffle consolidation fixe...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1678#discussion_r15682389 --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala --- @@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V]( private def revertWrites(): Unit = { if (shuffle != null shuffle.writers != null) { for (writer - shuffle.writers) { -writer.revertPartialWrites() -writer.close() +writer.revertPartialWritesAndClose() --- End diff -- revert can throw exception : which will cause other writers to not revert. We need to wrap it in try/catch, log and continue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2532: Minimal shuffle consolidation fixe...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1678#discussion_r15682412 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter( override def isOpen: Boolean = objOut != null - override def commit(): Long = { + override def commitAndClose(): Unit = { if (initialized) { // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the // serializer stream and the lower level stream. objOut.flush() bs.flush() - val prevPos = lastValidPosition - lastValidPosition = channel.position() - lastValidPosition - prevPos -} else { - // lastValidPosition is zero if stream is uninitialized - lastValidPosition + close() } +finalPosition = file.length() } - override def revertPartialWrites() { -if (initialized) { - // Discard current writes. We do this by flushing the outstanding writes and - // truncate the file to the last valid position. - objOut.flush() - bs.flush() - channel.truncate(lastValidPosition) + // Discard current writes. We do this by flushing the outstanding writes and then + // truncating the file to its initial position. + override def revertPartialWritesAndClose() { +try { + if (initialized) { +objOut.flush() +bs.flush() +close() + } + + val truncateStream = new FileOutputStream(file, true) + try { +truncateStream.getChannel.truncate(initialPosition) + } finally { +truncateStream.close() + } +} catch { + case e: Exception = +logError(Uncaught exception while reverting partial writes to file + file, e) --- End diff -- In the use of writers in HashShuffleWriter, it is possible for a closed stream to be reverted (if some other stream's close failed for example). In the above, that will leave this file with leftover data - I am not sure what the impact of this would be. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2532: Minimal shuffle consolidation fixe...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1678#discussion_r15682457 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter( override def isOpen: Boolean = objOut != null - override def commit(): Long = { + override def commitAndClose(): Unit = { --- End diff -- We should remove close from the interface, and make it private to this class btw. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2532: Minimal shuffle consolidation fixe...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1678#discussion_r15683205 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter( override def isOpen: Boolean = objOut != null - override def commit(): Long = { + override def commitAndClose(): Unit = { if (initialized) { // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the // serializer stream and the lower level stream. objOut.flush() bs.flush() - val prevPos = lastValidPosition - lastValidPosition = channel.position() - lastValidPosition - prevPos -} else { - // lastValidPosition is zero if stream is uninitialized - lastValidPosition + close() } +finalPosition = file.length() } - override def revertPartialWrites() { -if (initialized) { - // Discard current writes. We do this by flushing the outstanding writes and - // truncate the file to the last valid position. - objOut.flush() - bs.flush() - channel.truncate(lastValidPosition) + // Discard current writes. We do this by flushing the outstanding writes and then + // truncating the file to its initial position. + override def revertPartialWritesAndClose() { +try { + if (initialized) { +objOut.flush() +bs.flush() +close() + } + + val truncateStream = new FileOutputStream(file, true) + try { +truncateStream.getChannel.truncate(initialPosition) + } finally { +truncateStream.close() + } +} catch { + case e: Exception = +logError(Uncaught exception while reverting partial writes to file + file, e) --- End diff -- I meant the former case : close on a writer fails with an exception; while earlier streams succeeded. So now we have some writers which have committed data (which is not removed by subsequent revert) while others are reverted. On the face of it, I agree, it should not cause issues : but then since the expectation from this class is never enforced; and so can silently fail. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2532: Minimal shuffle consolidation fixe...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1678#discussion_r15683224 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter( override def isOpen: Boolean = objOut != null - override def commit(): Long = { + override def commitAndClose(): Unit = { --- End diff -- When I merged the sort patch, and modified EAOM, it was simply replace close with commitAndClose. commitAndClose should be semantically equivalent to close actually. It is not equivalent to commit() - but we want to remove that :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2033] Automatically cleanup checkpoint
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/855#issuecomment-50895949 This definitely is much better, thanks for the PR ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2532: Minimal shuffle consolidation fixe...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1678#discussion_r15701250 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala --- @@ -147,28 +147,36 @@ private[spark] class DiskBlockObjectWriter( override def isOpen: Boolean = objOut != null - override def commit(): Long = { + override def commitAndClose(): Unit = { if (initialized) { // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the // serializer stream and the lower level stream. objOut.flush() bs.flush() - val prevPos = lastValidPosition - lastValidPosition = channel.position() - lastValidPosition - prevPos -} else { - // lastValidPosition is zero if stream is uninitialized - lastValidPosition + close() } +finalPosition = file.length() } - override def revertPartialWrites() { -if (initialized) { - // Discard current writes. We do this by flushing the outstanding writes and - // truncate the file to the last valid position. - objOut.flush() - bs.flush() - channel.truncate(lastValidPosition) + // Discard current writes. We do this by flushing the outstanding writes and then + // truncating the file to its initial position. + override def revertPartialWritesAndClose() { +try { + if (initialized) { +objOut.flush() +bs.flush() +close() + } + + val truncateStream = new FileOutputStream(file, true) + try { +truncateStream.getChannel.truncate(initialPosition) + } finally { +truncateStream.close() + } +} catch { + case e: Exception = +logError(Uncaught exception while reverting partial writes to file + file, e) --- End diff -- Ah, did not notice that the if (initialized) did not include the truncate call ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1767: Prefer HDFS-cached replicas when s...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1486#discussion_r15725601 --- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala --- @@ -243,10 +244,23 @@ class HadoopRDD[K, V]( new HadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning) } - override def getPreferredLocations(split: Partition): Seq[String] = { -// TODO: Filtering out localhost in case of file:// URLs -val hadoopSplit = split.asInstanceOf[HadoopPartition] -hadoopSplit.inputSplit.value.getLocations.filter(_ != localhost) + override def getPreferredLocations(hsplit: Partition): Seq[String] = { +val split = hsplit.asInstanceOf[HadoopPartition].inputSplit.value +val locs = HadoopRDD.NEW_HADOOP_CLASSES match { + case Some(c) = +try { + val lsplit = c.inputSplitWithLocationInfo.cast(split) + val infos = c.getLocationInfo. + invoke(lsplit).asInstanceOf[Array[Object]] + Some(HadoopRDD.convertSplitLocationInfo(infos)) --- End diff -- Given how frequently this is invoked, instead of using reflection, it is better to have an interface and an specific impl to handle this case (where the new hadoop api's are inlined, and not accessed via reflection). Ofcourse, initialization to use this new impl or None would be based on existance of the methods : and so decided via reflection - but that would be a one time operation (replacing NEW_HADOOP_CLASSES) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1767: Prefer HDFS-cached replicas when s...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1486#discussion_r15725610 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -216,6 +216,7 @@ abstract class RDD[T: ClassTag]( getPreferredLocations(split) } } +//out.map(PartitionLocation.fromString(_)) --- End diff -- please revert changes to this file --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1722#discussion_r15725631 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -215,16 +218,28 @@ class ExternalAppendOnlyMap[K, V, C]( if (objectsWritten == serializerBatchSize) { flush() - writer.close() writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize) } } if (objectsWritten 0) { flush() + } else if (writer != null) { --- End diff -- when objectsWritten == 0, writer != null will hold. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1722#discussion_r15725641 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -389,27 +404,51 @@ class ExternalAppendOnlyMap[K, V, C]( * An iterator that returns (K, C) pairs in sorted order from an on-disk map */ private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long]) -extends Iterator[(K, C)] { -private val fileStream = new FileInputStream(file) -private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) +extends Iterator[(K, C)] + { --- End diff -- Those asserts caught the bugs :-) Bug yeah, some of them might have been expensive. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1722#discussion_r15725667 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -389,27 +404,51 @@ class ExternalAppendOnlyMap[K, V, C]( * An iterator that returns (K, C) pairs in sorted order from an on-disk map */ private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long]) -extends Iterator[(K, C)] { -private val fileStream = new FileInputStream(file) -private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) +extends Iterator[(K, C)] + { +private val batchOffsets = batchSizes.scanLeft(0L)(_ + _) // Size will be batchSize.length + 1 +assert(file.length() == batchOffsets(batchOffsets.length - 1)) + +private var batchIndex = 0 // Which batch we're in +private var fileStream: FileInputStream = null // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams -private var batchStream = nextBatchStream() -private var compressedStream = blockManager.wrapForCompression(blockId, batchStream) -private var deserializeStream = ser.deserializeStream(compressedStream) +private var deserializeStream = nextBatchStream() private var nextItem: (K, C) = null private var objectsRead = 0 /** * Construct a stream that reads only from the next batch. */ -private def nextBatchStream(): InputStream = { - if (batchSizes.length 0) { -ByteStreams.limit(bufferedStream, batchSizes.remove(0)) +private def nextBatchStream(): DeserializationStream = { + // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether + // we're still in a valid batch. + if (batchIndex batchOffsets.length - 1) { +if (deserializeStream != null) { + deserializeStream.close() + fileStream.close() + deserializeStream = null + fileStream = null +} + +val start = batchOffsets(batchIndex) +fileStream = new FileInputStream(file) +fileStream.getChannel.position(start) +batchIndex += 1 + +val end = batchOffsets(batchIndex) + +assert(end = start, start = + start + , end = + end + + , batchOffsets = + batchOffsets.mkString([, , , ])) + +val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) +val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) +ser.deserializeStream(compressedStream) --- End diff -- So that is something I was not sure of : particularly with kryo (not java). We were seeing the input buffer getting stepped on from various threads - this was specifically in context of 2G fixes though, where we had to modify the way the buffer was created anyway. I dont know if the initialization changes something else. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1722#discussion_r15725700 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -455,7 +495,25 @@ class ExternalAppendOnlyMap[K, V, C]( // TODO: Ensure this gets called even if the iterator isn't drained. private def cleanup() { - deserializeStream.close() + batchIndex = batchOffsets.length // Prevent reading any other batch + val ds = deserializeStream + val fs = fileStream + deserializeStream = null + fileStream = null + + if (ds != null) { +try { + ds.close() +} catch { + case e: IOException = +// Make sure we at least close the file handle +if (fs != null) { + try { fs.close() } catch { case e2: IOException = } --- End diff -- This is just paranoid stuff, you can remove the catch IOException part actually The reason it exists is cos we have a Ulimit'ed FileInputStream - which enforces a ulimit of 8k on spark (which is the fd limit in our clusters). For large reducers, this prevents task from getting killed. So this is an attempt to ensure that the stream is truely closed. I did not realize this had leaked out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1722#discussion_r15725724 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2 + private def createSparkConf(loadDefaults: Boolean): SparkConf = { +val conf = new SparkConf(loadDefaults) +// Make the Java serializer write a reset instruction (TC_RESET) after each object to test +// for a bug we had with bytes written past the last object in a batch (SPARK-2792) +conf.set(spark.serializer.objectStreamReset, 0) +conf.set(spark.serializer, org.apache.spark.serializer.JavaSerializer) +// Ensure that we actually have multiple batches per spill file +conf.set(spark.shuffle.spill.batchSize, 10) --- End diff -- Actually I should have made it 0 - to ensure reset after each object (with the corresponding java serializer change to enable it) is because we want to ensure that there is a TC_RESET after object write and before close. Else the bug is not exposed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1722#discussion_r15725858 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2 + private def createSparkConf(loadDefaults: Boolean): SparkConf = { +val conf = new SparkConf(loadDefaults) +// Make the Java serializer write a reset instruction (TC_RESET) after each object to test +// for a bug we had with bytes written past the last object in a batch (SPARK-2792) +conf.set(spark.serializer.objectStreamReset, 0) +conf.set(spark.serializer, org.apache.spark.serializer.JavaSerializer) +// Ensure that we actually have multiple batches per spill file +conf.set(spark.shuffle.spill.batchSize, 10) --- End diff -- Oops, I misread. You are right, it looks fine (as long a batch size 20, it should be exhibited) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1525#discussion_r15725875 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - var totalExpectedExecutors = new AtomicInteger(0) + var totalExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit tasks only after (registered executors / total expected executors) + // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. - var minRegisteredRatio = conf.getDouble(spark.scheduler.minRegisteredExecutorsRatio, 0) - if (minRegisteredRatio 1) minRegisteredRatio = 1 - // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds). + var minRegisteredRatio = +math.min(1, conf.getDouble(spark.scheduler.minRegisteredResourcesRatio, 0)) + // Submit tasks after maxRegisteredWaitingTime milliseconds + // if minRegisteredRatio has not yet been reached val maxRegisteredWaitingTime = -conf.getInt(spark.scheduler.maxRegisteredExecutorsWaitingTime, 3) +conf.getInt(spark.scheduler.maxRegisteredResourcesWaitingTime, 3) --- End diff -- interface change ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1525#discussion_r15725931 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - var totalExpectedExecutors = new AtomicInteger(0) + var totalExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit tasks only after (registered executors / total expected executors) + // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. - var minRegisteredRatio = conf.getDouble(spark.scheduler.minRegisteredExecutorsRatio, 0) - if (minRegisteredRatio 1) minRegisteredRatio = 1 - // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds). + var minRegisteredRatio = +math.min(1, conf.getDouble(spark.scheduler.minRegisteredResourcesRatio, 0)) + // Submit tasks after maxRegisteredWaitingTime milliseconds + // if minRegisteredRatio has not yet been reached val maxRegisteredWaitingTime = -conf.getInt(spark.scheduler.maxRegisteredExecutorsWaitingTime, 3) +conf.getInt(spark.scheduler.maxRegisteredResourcesWaitingTime, 3) --- End diff -- I think this was introduced in 1.1 ? If yes, ok to rename --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1722#discussion_r15728047 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2 + private def createSparkConf(loadDefaults: Boolean): SparkConf = { +val conf = new SparkConf(loadDefaults) +// Make the Java serializer write a reset instruction (TC_RESET) after each object to test +// for a bug we had with bytes written past the last object in a batch (SPARK-2792) +conf.set(spark.serializer.objectStreamReset, 0) +conf.set(spark.serializer, org.apache.spark.serializer.JavaSerializer) +// Ensure that we actually have multiple batches per spill file +conf.set(spark.shuffle.spill.batchSize, 10) --- End diff -- BTW, was the java serializer fix also ported across ? I dont think @aarondav did that ... Else the object reset = 0 will cause serializer to ignore it (in master it used to check for value 0) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Minor] Fixes on top of #1679
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1736#discussion_r15728056 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala --- @@ -46,9 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar metricRegistry.register(MetricRegistry.name(memory, memUsed_MB), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus - val maxMem = storageStatusList.map(_.maxMem).sum - val remainingMem = storageStatusList.map(_.memRemaining).sum - (maxMem - remainingMem) / 1024 / 1024 + val memUsed = storageStatusList.map(_.memUsed).sum + memUsed / 1024 / 1024 --- End diff -- nit: / (1024 * 1024) instead ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Minor] Fixes on top of #1679
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1736#discussion_r15732438 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala --- @@ -46,9 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar metricRegistry.register(MetricRegistry.name(memory, memUsed_MB), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus - val maxMem = storageStatusList.map(_.maxMem).sum - val remainingMem = storageStatusList.map(_.memRemaining).sum - (maxMem - remainingMem) / 1024 / 1024 + val memUsed = storageStatusList.map(_.memUsed).sum + memUsed / 1024 / 1024 --- End diff -- bad code is bad code :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Minor] Fixes on top of #1679
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1736#discussion_r15732470 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala --- @@ -46,9 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar metricRegistry.register(MetricRegistry.name(memory, memUsed_MB), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus - val maxMem = storageStatusList.map(_.maxMem).sum - val remainingMem = storageStatusList.map(_.memRemaining).sum - (maxMem - remainingMem) / 1024 / 1024 + val memUsed = storageStatusList.map(_.memUsed).sum + memUsed / 1024 / 1024 --- End diff -- Btw, it is just a nit - so please dont let this block a commit ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1722#issuecomment-50992153 LGTM, thanks Matei ! On 03-Aug-2014 12:13 pm, Matei Zaharia notificati...@github.com wrote: @aarondav https://github.com/aarondav / @mridulm https://github.com/mridulm any other comments on this, or is it okay to merge? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/1722#issuecomment-50983403. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1722#issuecomment-50992283 Oh wait, is the java serialier change also ported ? Else the tests won't do what we want it to do. On 03-Aug-2014 8:11 pm, Mridul Muralidharan mri...@gmail.com wrote: LGTM, thanks Matei ! On 03-Aug-2014 12:13 pm, Matei Zaharia notificati...@github.com wrote: @aarondav https://github.com/aarondav / @mridulm https://github.com/mridulm any other comments on this, or is it okay to merge? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/1722#issuecomment-50983403. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1722#issuecomment-51003282 LGTM ! Though I would prefer if @aarondav also took a look at it - since this is based on my earlier work, I might be too close to it to see potential issues ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1722#discussion_r15750212 --- Diff: core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala --- @@ -35,16 +35,15 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In /** * Calling reset to avoid memory leak: * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api - * But only call it every 10,000th time to avoid bloated serialization streams (when + * But only call it every 100th time to avoid bloated serialization streams (when * the stream 'resets' object class descriptions have to be re-written) */ def writeObject[T: ClassTag](t: T): SerializationStream = { objOut.writeObject(t) +counter += 1 if (counterReset 0 counter = counterReset) { --- End diff -- This is the right behavior, but is a slight change ... I dont think anyone is expecting the earlier behavior though ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1722#issuecomment-51047651 LGTM ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2856] Decrease initial buffer size for ...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1780#issuecomment-51168402 IIRC if kryo cant host entire serialized object in the buffer, it throws up : we saw issues with it being as high as 256 kb for some of our jobs : though we were using a different api (and it was slightly more complicated usecase). Might be a good idea to verify this before changing default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2503] Lower shuffle output buffer (spar...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1781#issuecomment-51169641 We are running this with 8k or so :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Expose aplication ID in ApplicationStart event...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/1218#discussion_r15827428 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1531,18 +1532,6 @@ object SparkContext extends Logging { throw new SparkException(YARN mode not available ?, e) } } -val backend = try { - val clazz = - Class.forName(org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend) - val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) - cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] -} catch { - case e: Exception = { -throw new SparkException(YARN mode not available ?, e) - } -} -scheduler.initialize(backend) -scheduler --- End diff -- The reflection code exists since the class can't be loaded in all env . Tom, has this changed recently? On 05-Aug-2014 10:27 pm, Marcelo Vanzin notificati...@github.com wrote: In core/src/main/scala/org/apache/spark/SparkContext.scala: @@ -1531,18 +1532,6 @@ object SparkContext extends Logging { throw new SparkException(YARN mode not available ?, e) } } -val backend = try { - val clazz = - Class.forName(org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend) - val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) - cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] -} catch { - case e: Exception = { -throw new SparkException(YARN mode not available ?, e) - } -} -scheduler.initialize(backend) -scheduler Maybe I'm missing something, but this is just removing a bunch of reflection code and replacing it with a single line later on (in YarnClusterScheduler): initialize(new YarnClusterSchedulerBackend(this, sc)) This looks much, much easier to read and cleaner to me, but if you guys somehow feel so strongly about it, I can revert the change. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/1218/files#r15825749. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2856] Decrease initial buffer size for ...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1780#issuecomment-51269524 Hi @pwendell, my observation about buffer size was not in context of spark ... we saw issues which looked like buffer overflow when the serialized object graph was large, and it was not handling the buffer growth properly. Fortunately, this was due to a bug in our code to begin with (object being serialized was holding unrequired reference to a large graph of objects - running into an mb or so) : and so did not need to pursue it much. But having seen something which should have been handled anyway, I want to make sure that changing the default does not cause surprises to our users. If there are issues with buffer growth, and we lower the limit, a lot of jobs will start failing on release. Given some of the past bugs we have fixed @pwendell (the flush issue comes to mind for example !), I am very wary of kryo - when it works, it is great, rest is suspicious :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Turn UpdateBlockInfo into case class.
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1872#issuecomment-51710872 If case class then does it still need to be Externalizable ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2952] Enable logging actor messages at ...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1870#issuecomment-51850204 Just saw this as part of the close, sorry for the late comment. Also, some of the INFO messages which are useful have now become DEBUG ? Makes it slightly harder to pinpoint an issue (ex: Stopping BlockManagerMaster). Spark logs are already hitting 60gig for some of our jobs in DEBUG mode ! Btw, if we are logging all messages, would it not be better to use TRACE instead of DEBUG ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2931] In TaskSetManager, reset currentL...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1896#issuecomment-51850469 It is just a modification of the test above it :-) Maybe some copy paste error ? That line is not required for this test btw - just the last line validates the issue. We might want to add something to check that new value for the index is 'sane' based on your change @JoshRosen ; which was not done in my test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2952] Enable logging actor messages at ...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1870#issuecomment-51850828 Unfortunately, in most cases, we wont know what the issue is other than bug hunting in the logs. So debug logging gets enabled for a wide swathe of packages. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2931] In TaskSetManager, reset currentL...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1896#issuecomment-51852483 It has to do with FakeRackUtil used in that class. I guess not all tests clean it up properly after assigning to it : which is why host2 (or host1 depending on order) ends up being RACK_LOCAL. In our test, we are not expecting it to be part of any rack. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2931] In TaskSetManager, reset currentL...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1896#issuecomment-51852593 To be deterministic, you can add FakeRackUtil.cleanup at begining of this test too. Though ideally we should do it to tests which add hosts to rack (and not cleanup in tests which dont care about it) I guess. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3875] Add TEMP DIRECTORY configuration
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/2729#issuecomment-58473957 At least for yarn, this will create issues if overridden from default. Not sure about mesos. Why not use std java property and define it for local and standalone mode where relevant. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3875] Add TEMP DIRECTORY configuration
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/2729#issuecomment-58479810 There is a java property which controls this ... java.io.tmpdir On 09-Oct-2014 1:22 pm, åé°å¸ notificati...@github.com wrote: @mridulm https://github.com/mridulm Using std java property is fine. Just add a more specific configuration argument. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/2729#issuecomment-58474600. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3889] Attempt to avoid SIGBUS by not mm...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/2742#issuecomment-58724312 This needs to be configurable ... IIRC 1.1 had this customizable. Different limits exist for vm vs heap memory in yarn (for example). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3889] Attempt to avoid SIGBUS by not mm...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/2742#issuecomment-58728241 With 1.1, in expts, we have done both : depending on whether our user code is mmap'ing too much data (and so we pull things into heap .. using libraries not in our control :-) ); decreasing it when heap is at premium. On 11-Oct-2014 4:42 am, Aaron Davidson notificati...@github.com wrote: @mridulm https://github.com/mridulm Could you give an example of which way you would want to shift it via config? Map more or less often? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/2742#issuecomment-58727337. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3889] Attempt to avoid SIGBUS by not mm...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/2742#issuecomment-58728319 Note: this is reqd since there are heap and vm limits enforced, so we juggle available memory around so that jobs can run to completion! On 11-Oct-2014 4:56 am, Mridul Muralidharan mri...@gmail.com wrote: With 1.1, in expts, we have done both : depending on whether our user code is mmap'ing too much data (and so we pull things into heap .. using libraries not in our control :-) ); decreasing it when heap is at premium. On 11-Oct-2014 4:42 am, Aaron Davidson notificati...@github.com wrote: @mridulm https://github.com/mridulm Could you give an example of which way you would want to shift it via config? Map more or less often? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/2742#issuecomment-58727337. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1937: fix issue with task locality
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/892#discussion_r13596860 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -153,8 +153,8 @@ private[spark] class TaskSetManager( } // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling - val myLocalityLevels = computeValidLocalityLevels() - val localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level + var myLocalityLevels = computeValidLocalityLevels() + var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level --- End diff -- Just for clarity : this should end up with only ANY in the locality levels. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1937: fix issue with task locality
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/892#discussion_r13597675 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -388,7 +386,7 @@ private[spark] class TaskSetManager( val curTime = clock.getTime() var allowedLocality = getAllowedLocalityLevel(curTime) - if (allowedLocality maxLocality) { + if (allowedLocality maxLocality myLocalityLevels.contains(maxLocality)) { allowedLocality = maxLocality // We're not allowed to search for farther-away tasks --- End diff -- This is incorrect. Consider this : maxLocality == ANY, but allowedLocality == PROCESS_LOCAL (since sufficient time has not elapsed for relaxing further) : and myLocalityLevels does not contain process local executors. Instead of not allowing any task to be executed; now we will allow scheduling of tasks with ANY preference. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1937: fix issue with task locality
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/892#discussion_r13598180 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -388,7 +386,7 @@ private[spark] class TaskSetManager( val curTime = clock.getTime() var allowedLocality = getAllowedLocalityLevel(curTime) - if (allowedLocality maxLocality) { + if (allowedLocality maxLocality myLocalityLevels.contains(maxLocality)) { allowedLocality = maxLocality // We're not allowed to search for farther-away tasks } --- End diff -- Editing previous comment: Suppose maxLocality == PROCESS_LOCAL while allowedLocality == RACK_LOCAL (by virture of having waited long enough for a free executor). Instead of restricting schedule to upto PROCESS_LOCAL, this will now relax it all the way till RACK_LOCAL - which is incorrect (myLocalityLevels might not have PROCESS_LOCAL but could have NODE_LOCAL for example). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1937: fix issue with task locality
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/892#discussion_r13601836 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -388,7 +386,7 @@ private[spark] class TaskSetManager( val curTime = clock.getTime() var allowedLocality = getAllowedLocalityLevel(curTime) - if (allowedLocality maxLocality) { + if (allowedLocality maxLocality myLocalityLevels.contains(maxLocality)) { allowedLocality = maxLocality // We're not allowed to search for farther-away tasks } --- End diff -- Back at desktop, so can elaborate better. The issue is, with relaxed constraint for the executor, some task might get scheduled to it - which would have been better scheduled at some other executor. Simple scenario extending my earlier example, suppose there is only one task t1 left and two executors become available. Suppose for exec1 it is RACK_LOCAL while for exec2 it is NODE_LOCAL We start with PROCESS_LOCAL as maxLocality - and suppose enough time had elapsed so allowedLocality == RACK_LOCAL or ANY. In this case, if resourceOffer is called on exec1 first, we get RACK_LOCAL schedule If resourceOffer was called on exec2 first, we get NODE_LOCAL schedule. The reason for that if condition was exactly to prevent this. I am actually surprised I did not have any testcase to catch this ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1937: fix issue with task locality
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/892#issuecomment-45732793 Just wanted to drop a quick node (since I might not be able to get to this until late next week). I think the proposal should work : though I might be missing something, so will let others opine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-45779752 This one slipped off my radar, my apologies. @tgravescs In #892, if there is even a single executor which is process local with any partition, then we start waiting for all levels based on configured timeouts. Here we are trying to ensure there are sufficient executors available before we start accepting jobs. The intent is slightly differrent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-45780405 Hit submit by mistake, to continue ... The side effect of not having sufficient executors are different from #892. For example, a) the default parallelism in yarn is based on number of executors, b) the number of intermediate files per node for shuffle (this can bring the node down btw) c) and amount of memory consumed on a node for rdd MEMORY persisted data (making the job fail if disk is not specified : like some of the mllib algos ?) and so on ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Just a POC for having compression for every RD...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1091#issuecomment-46125063 Use spark.rdd.compress = true for compressing serialized RDD. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Compression should be a setting for individual...
Github user mridulm commented on the pull request: https://github.com/apache/spark/pull/1091#issuecomment-46481279 Misread the PR and confused it with another pull request, ignore my earlier comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---