mridulm commented on a change in pull request #32287:
URL: https://github.com/apache/spark/pull/32287#discussion_r618106484
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -683,7 +694,28 @@ final class ShuffleBlockFetcherIterator(
}
}
- case FailureFetchResult(blockId, mapIndex, address, e) =>
+ // Catching OOM and do something based on it is only a workaround for
handling the
+ // Netty OOM issue, which is not the best way towards memory
management. We can
+ // get rid of it when we find a way to manage Netty's memory precisely.
+ case FailureFetchResult(blockId, mapIndex, address, size,
isNetworkReqDone, e)
+ if e.isInstanceOf[OutOfDirectMemoryError] ||
e.isInstanceOf[NettyOutOfMemoryError] =>
+ assert(address != blockManager.blockManagerId &&
+ !hostLocalBlocks.contains(blockId -> mapIndex),
+ "Netty OOM error should only happen on remote fetch requests")
+ logWarning(s"Failed to fetch block $blockId due to Netty OOM, will
retry", e)
+ NettyUtils.isNettyOOMOnShuffle = true
+ numBlocksInFlightPerAddress(address) =
numBlocksInFlightPerAddress(address) - 1
+ bytesInFlight -= size
+ if (isNetworkReqDone) {
+ reqsInFlight -= 1
+ logDebug("Number of requests in flight " + reqsInFlight)
+ }
+ val defReqQueue =
+ deferredFetchRequests.getOrElseUpdate(address, new
Queue[FetchRequest]())
+ defReqQueue.enqueue(FetchRequest(address,
Array(FetchBlockInfo(blockId, size, mapIndex))))
Review comment:
How many times will we retry ?
##########
File path:
common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
##########
@@ -37,6 +37,14 @@
*/
public class NettyUtils {
+ /**
+ * A flag which indicates whether the Netty OOM error has raised during
shuffle.
+ * If true, unless there's no in-flight fetch requests, all the pending
shuffle
+ * fetch requests will be deferred until the flag is unset (whenever there's
a
+ * complete fetch request).
+ */
+ public static volatile boolean isNettyOOMOnShuffle = false;
Review comment:
super nit: Make it `AtomicBoolean` instead ?
##########
File path:
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##########
@@ -801,6 +838,8 @@ private class BufferReleasingInputStream(
if (!closed) {
delegate.close()
iterator.releaseCurrentResultBuffer()
+ // Unset the flag when a remote request finished.
+ if (isNetworkReqDone) NettyUtils.isNettyOOMOnShuffle = false
closed = true
Review comment:
Move this intro try/finally ? close exceptions can get ignored; while
`isNettyOOMOnShuffle` will never get unset
##########
File path:
common/network-common/src/main/java/org/apache/spark/network/util/NettyOutOfMemoryError.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+/**
+ * This class is for test only. It's needed because neither the constructor of
Netty's
+ * {@link io.netty.util.internal.OutOfDirectMemoryError} can be accessed nor
be mocked
+ * due to the type is declared as final.
+ */
+public class NettyOutOfMemoryError extends OutOfMemoryError {
Review comment:
Nit: Rename to make it clear this is just for test ?
Thought: Can we use `SparkOutOfMemoryError` instead ? We can move it out to
a module accessible from here as well ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]