otterc commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r640855529
##########
File path:
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -824,17 +919,17 @@ public void
testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws
private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile)
throws IOException {
pushResolver = new RemoteBlockPushResolver(conf) {
@Override
- AppShufflePartitionInfo newAppShufflePartitionInfo(AppShuffleId
appShuffleId, int reduceId,
- File dataFile, File indexFile, File metaFile) throws IOException {
+ AppShufflePartitionInfo newAppShufflePartitionInfo(AppAttemptShuffleId
appAttemptShuffleId, int reduceId,
+ File dataFile, File
indexFile, File metaFile) throws IOException {
Review comment:
Ni: indentation
##########
File path:
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -61,6 +61,17 @@
private static final Logger log =
LoggerFactory.getLogger(RemoteBlockPushResolverSuite.class);
private final String TEST_APP = "testApp";
+ private final String MERGE_DIRECTORY = "merge_directory";
+ private final int NON_ATTEMPTID = -1;
Review comment:
NIt: NO_ATTEMPT_ID
##########
File path:
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -149,10 +160,10 @@ public void testFinalizeWithMultipleReducePartitions()
throws IOException {
@Test
public void testDeferredBufsAreWrittenDuringOnData() throws IOException {
StreamCallbackWithID stream1 =
- pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0,
0, 0, 0));
+ pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, -1,
0, 0, 0, 0));
Review comment:
NIT: Use NO_ATTEMPT_ID here and at other places
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -117,64 +123,85 @@ public ShuffleIndexInformation load(File file) throws
IOException {
* shuffle does not exist, initializes the metadata.
*/
private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
- AppShuffleId appShuffleId,
+ AppAttemptShuffleId appAttemptShuffleId,
+ AppAttemptPathsInfo appAttemptPathsInfo,
int reduceId) {
- File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
- if (!partitions.containsKey(appShuffleId) && dataFile.exists()) {
+ File dataFile = getMergedShuffleDataFile(
+ appAttemptPathsInfo, appAttemptShuffleId.appId,
appAttemptShuffleId.shuffleId, reduceId);
+ if (!partitions.containsKey(appAttemptShuffleId) && dataFile.exists()) {
// If this partition is already finalized then the partitions map will
not contain
// the appShuffleId but the data file would exist. In that case the
block is considered late.
return null;
}
- Map<Integer, AppShufflePartitionInfo> shufflePartitions =
- partitions.computeIfAbsent(appShuffleId, id -> Maps.newConcurrentMap());
- return shufflePartitions.computeIfAbsent(reduceId, key -> {
- // It only gets here when the key is not present in the map. This could
either
- // be the first time the merge manager receives a pushed block for a
given application
- // shuffle partition, or after the merged shuffle file is finalized. We
handle these
- // two cases accordingly by checking if the file already exists.
- File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
- File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId);
- try {
- if (dataFile.exists()) {
- return null;
- } else {
- return newAppShufflePartitionInfo(appShuffleId, reduceId, dataFile,
indexFile, metaFile);
- }
- } catch (IOException e) {
- logger.error(
- "Cannot create merged shuffle partition with data file {}, index
file {}, and "
- + "meta file {}", dataFile.getAbsolutePath(),
- indexFile.getAbsolutePath(), metaFile.getAbsolutePath());
- throw new RuntimeException(
- String.format("Cannot initialize merged shuffle partition for appId
%s shuffleId %s "
- + "reduceId %s", appShuffleId.appId, appShuffleId.shuffleId,
reduceId), e);
+ // While processing the application remove, where the shuffle partitions
info for the specific
+ // application will be cleaned up, this method will still be called to
create new partitions
+ // as of receiving the push blocks. To avoid the potential memory leak,
before creating the
+ // empty hashmap for storing the shuffle partitions information in the
partitions hashmap,
+ // we need to make sure that the entry for the specific application must
still exist in
+ // appAttemptsPathInfo hashmap. Otherwise, the push blocks should be
ignored.
Review comment:
I don't see how this completely avoids the issue. There can be a context
switch after line 150 and another thread can call `applicationRemoved`. I don't
think we should try to solve this issue with this PR. This is a problem with
existing code so why not create another jira for it. The goal of this jira is
to add support in DiskBlockManager to server local merged data and we keep
deviating from it.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -290,16 +343,63 @@ void deleteExecutorDirs(Path[] dirs) {
}
}
+ /**
+ * Create StreamCallback for invalid push blocks with the specific error
message.
+ * If specific error message is null, this StreamCallback won't throw
exception in client.
+ */
+ private StreamCallbackWithID createCallbackForInvalidPushBlocks(
+ String streamId,
+ String errorMessage) {
+ return new StreamCallbackWithID() {
+ @Override
+ public String getID() {
+ return streamId;
+ }
+
+ @Override
+ public void onData(String streamId, ByteBuffer buf) {
+ // Ignore the requests. It reaches here either when a request is
received after the
+ // shuffle file is finalized or when a request is for a duplicate
block.
+ }
+
+ @Override
+ public void onComplete(String streamId) {
+ if (errorMessage != null) {
+ // Throw an exception here so the block data is drained from channel
and server
+ // responds RpcFailure to the client.
+ throw new RuntimeException(String.format("Block %s %s", streamId,
errorMessage));
+ }
+ // For duplicate block that is received before the shuffle merge
finalizes, the
+ // server should respond success to the client.
+ }
+
+ @Override
+ public void onFailure(String streamId, Throwable cause) {
+ }
+ };
+ }
+
@Override
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
// Retrieve merged shuffle file metadata
- AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
+ AppAttemptPathsInfo appAttemptPathsInfo =
getAppAttemptPathsInfo(msg.appId);
+ final String streamId = String.format("%s_%d_%d_%d",
+ OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId,
msg.mapIndex, msg.reduceId);
+ AppAttemptShuffleId appAttemptShuffleId =
+ new AppAttemptShuffleId(msg.appId, msg.attemptId, msg.shuffleId);
+ if (appAttemptPathsInfo.attemptId != appAttemptShuffleId.attemptId) {
+ // If this Block belongs to a former application attempt, it is
considered late,
+ // as only the blocks from the current application attempt will be merged
+ return createCallbackForInvalidPushBlocks(streamId,
+
ErrorHandler.BlockPushErrorHandler.NEWER_ATTEMPT_HAS_STARTED_MESSAGE_SUFFIX);
Review comment:
Please add a TODO as well as jira ticket for this that the client need
to handle this failure.
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -52,6 +53,14 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new
Array[File](subDirsPerLocalDir))
+ /**
+ * Create merge directories
+ */
+ createLocalDirsForMergedShuffleBlocks(conf)
+
+ private[spark] lazy val activeMergedShuffleDirs: Option[Array[File]] =
+ findActiveMergedShuffleDirs(conf)
+
Review comment:
If this executor is not creating the merged directory, then currently it
seems that activeMergedShuffleDirs will be empty. Check line 216.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
##########
@@ -35,7 +35,11 @@
public final String[] localDirs;
/** Number of subdirectories created within each localDir. */
public final int subDirsPerLocalDir;
- /** Shuffle manager (SortShuffleManager) that the executor is using. */
+ /** Shuffle manager (SortShuffleManager) that the executor is using.
Review comment:
Nit: Since it is no longer one line, start it from the next line. Also
fix line 42
##########
File path:
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -61,6 +61,17 @@
private static final Logger log =
LoggerFactory.getLogger(RemoteBlockPushResolverSuite.class);
private final String TEST_APP = "testApp";
+ private final String MERGE_DIRECTORY = "merge_directory";
+ private final int NON_ATTEMPTID = -1;
+ private final int ATTEMPTID_1 = 1;
Review comment:
Nit: ATTEMPT_ID_1
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -336,51 +436,26 @@ public StreamCallbackWithID
receiveBlockDataAsStream(PushBlockStream msg) {
final AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck !=
null
&& partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null
: partitionInfoBeforeCheck;
- final String streamId = String.format("%s_%d_%d_%d",
- OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, appShuffleId.shuffleId,
msg.mapIndex,
- msg.reduceId);
if (partitionInfo != null) {
return new PushBlockStreamCallback(this, streamId, partitionInfo,
msg.mapIndex);
} else {
// For a duplicate block or a block which is late, respond back with a
callback that handles
// them differently.
- return new StreamCallbackWithID() {
- @Override
- public String getID() {
- return streamId;
- }
-
- @Override
- public void onData(String streamId, ByteBuffer buf) {
- // Ignore the requests. It reaches here either when a request is
received after the
- // shuffle file is finalized or when a request is for a duplicate
block.
- }
-
- @Override
- public void onComplete(String streamId) {
- if (isTooLate) {
- // Throw an exception here so the block data is drained from
channel and server
- // responds RpcFailure to the client.
- throw new RuntimeException(String.format("Block %s %s", streamId,
- ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX));
- }
- // For duplicate block that is received before the shuffle merge
finalizes, the
- // server should respond success to the client.
- }
-
- @Override
- public void onFailure(String streamId, Throwable cause) {
- }
- };
+ if (isTooLate) {
Review comment:
Can you please explain why you have broken down this into if/else here?
What will will the client do when it receives a `null` as a failure message?
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
##########
@@ -40,5 +41,15 @@ trait ShuffleBlockResolver {
*/
def getBlockData(blockId: BlockId, dirs: Option[Array[String]] = None):
ManagedBuffer
+ /**
+ * Retrieve the data for the specified merged shuffle block as multiple
chunks.
+ */
+ def getMergedBlockData(blockId: ShuffleBlockId): Seq[ManagedBuffer]
Review comment:
All of these are missing the `dirs` parameter
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -52,6 +53,14 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new
Array[File](subDirsPerLocalDir))
+ /**
+ * Create merge directories
+ */
+ createLocalDirsForMergedShuffleBlocks(conf)
+
+ private[spark] lazy val activeMergedShuffleDirs: Option[Array[File]] =
+ findActiveMergedShuffleDirs(conf)
+
Review comment:
BTW this is no longer needed. The dirs are passed to the getMerged...
APIs.
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -728,6 +736,23 @@ private[spark] class BlockManager(
}
}
+ /**
+ * Get the local merged shuffle block data for the given block ID as
multiple chunks.
+ * A merged shuffle file is divided into multiple chunks according to the
index file.
+ * Instead of reading the entire file as a single block, we split it into
smaller chunks
+ * which will be memory efficient when performing certain operations.
+ */
+ override def getLocalMergedBlockData(blockId: ShuffleBlockId):
Seq[ManagedBuffer] = {
Review comment:
This is also the older API. @zhouyejoe We need to add the dirs. Please
look at these APIs in https://github.com/apache/spark/pull/32140/
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -83,6 +101,34 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
def getFile(blockId: BlockId): File = getFile(blockId.name)
+ /**
+ * This should be in sync with
+ * @see
[[org.apache.spark.network.shuffle.RemoteBlockPushResolver#getFile(java.lang.String,
+ * java.lang.String)]]
+ */
+ def getMergedShuffleFile(blockId: BlockId): File = {
Review comment:
@zhouyejoe This is outdated code. We need to pass the `dirs:
Option[Array[String]]`to this method to support discovering location of merged
block directories when dynamic resource allocation is enabled.
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -415,24 +490,65 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
bitmaps.toArray(new RoaringBitmap[bitmaps.size()]),
Ints.toArray(reduceIds),
Longs.toArray(sizes));
}
- partitions.remove(appShuffleId);
- logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId,
msg.appId);
+ partitions.remove(appAttemptShuffleId);
+ logger.info("Finalized shuffle {} from Application {}_{}.", msg.shuffleId,
msg.appId, msg.attemptId);
return mergeStatuses;
}
@Override
public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo)
{
if (logger.isDebugEnabled()) {
logger.debug("register executor with RemoteBlockPushResolver {}
local-dirs {} "
- + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
- executorInfo.subDirsPerLocalDir);
+ + "num sub-dirs {} shuffleManager {}", appId,
Arrays.toString(executorInfo.localDirs),
+ executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+ }
+ String shuffleManagerMeta = executorInfo.shuffleManager;
+ if (shuffleManagerMeta.contains(":")) {
+ String mergeDirInfo =
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ MergeDirectoryMeta mergeDirectoryMeta = mapper.readValue(mergeDirInfo,
MergeDirectoryMeta.class);
+ if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) {
+ // When attemptId is -1, there is no attemptId stored in the
ExecutorShuffleInfo.
+ // Only the first ExecutorRegister message can register the merge
dirs
+ appAttemptsPathsInfo.computeIfAbsent(appId, id ->
+ new AppAttemptPathsInfo(appId, mergeDirectoryMeta.attemptId,
executorInfo.localDirs,
+ mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir));
+ } else {
+ // If attemptId is not -1, there is attemptId stored in the
ExecutorShuffleInfo.
+ // The first ExecutorRegister message from the same application
attempt wil register
+ // the merge dirs in External Shuffle Service. Any later
ExecutorRegister message
+ // from the same application attempt will not override the merge
dirs. But it can
+ // be overridden by ExecutorRegister message from newer application
attempt,
+ // and former attempts' shuffle partitions information will also be
cleaned up.
+ AtomicBoolean newAttemptRegistered = new AtomicBoolean(false);
+ appAttemptsPathsInfo.compute(appId, (id, appAttemptPathsInfo) -> {
+ if (appAttemptPathsInfo == null || (appAttemptPathsInfo != null
+ && mergeDirectoryMeta.attemptId >
appAttemptPathsInfo.attemptId)) {
+ appAttemptPathsInfo =
+ new AppAttemptPathsInfo(
+ appId, mergeDirectoryMeta.attemptId, executorInfo.localDirs,
+ mergeDirectoryMeta.mergeDir,
executorInfo.subDirsPerLocalDir);
+ newAttemptRegistered.set(true);
+ }
+ return appAttemptPathsInfo;
+ });
+ if (newAttemptRegistered.get()) {
+ logger.warn("Remove shuffle partition info for {}_{} generated for
former attempts",
+ appId, mergeDirectoryMeta.attemptId);
+ cleanupShufflePartitionInfo(appId, mergeDirectoryMeta.attemptId);
Review comment:
`cleanupShufflePartitionInfo` iterates over the whole `partitions` map
so we have increased the latency of registration. This cleanup can happen
asynchronously. Please add a TODO and create a jira for it as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]