This is an automated email from the ASF dual-hosted git repository.

slfan1989 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f1ce273150c0 MAPREDUCE-7457: Added support to limit count of spill 
files (#6155) Contributed by Mudit Sharma.
f1ce273150c0 is described below

commit f1ce273150c0282c39b74df4e2a0c067df5a5e5c
Author: mudit1289 <50325057+mudit1...@users.noreply.github.com>
AuthorDate: Tue Oct 31 04:28:22 2023 +0530

    MAPREDUCE-7457: Added support to limit count of spill files (#6155) 
Contributed by Mudit Sharma.
    
    Reviewed-by: Shilun Fan <slfan1...@apache.org>
    Signed-off-by: Shilun Fan <slfan1...@apache.org>
---
 .../java/org/apache/hadoop/mapred/MapTask.java     | 31 ++++++++-
 .../org/apache/hadoop/mapreduce/MRJobConfig.java   |  1 +
 .../src/main/resources/mapred-default.xml          |  9 +++
 .../java/org/apache/hadoop/mapred/TestMapTask.java | 78 ++++++++++++++++++++++
 4 files changed, 116 insertions(+), 3 deletions(-)

diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
index 06d9fbbe7a32..4f86f912838f 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
@@ -955,7 +955,10 @@ public class MapTask extends Task {
       new ArrayList<SpillRecord>();
     private int totalIndexCacheMemory;
     private int indexCacheMemoryLimit;
+    private int spillFilesCountLimit;
     private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
+    private static final int SPILL_FILES_COUNT_LIMIT_DEFAULT = -1;
+    private static final int SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1;
 
     private MapTask mapTask;
     private MapOutputFile mapOutputFile;
@@ -984,10 +987,17 @@ public class MapTask extends Task {
           MRJobConfig.DEFAULT_IO_SORT_MB);
       indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                          INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
+      spillFilesCountLimit = job.getInt(JobContext.SPILL_FILES_COUNT_LIMIT,
+          SPILL_FILES_COUNT_LIMIT_DEFAULT);
       if (spillper > (float)1.0 || spillper <= (float)0.0) {
         throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT 
+
             "\": " + spillper);
       }
+      if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE
+          && spillFilesCountLimit < 0) {
+        throw new IOException("Invalid value for \"" + 
JobContext.SPILL_FILES_COUNT_LIMIT + "\", " +
+            "current value: " + spillFilesCountLimit);
+      }
       if ((sortmb & 0x7FF) != sortmb) {
         throw new IOException(
             "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
@@ -1698,7 +1708,7 @@ public class MapTask extends Task {
             spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
         }
         LOG.info("Finished spill " + numSpills);
-        ++numSpills;
+        incrementNumSpills();
       } finally {
         if (out != null) out.close();
         if (partitionOut != null) {
@@ -1774,7 +1784,7 @@ public class MapTask extends Task {
           totalIndexCacheMemory +=
             spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
         }
-        ++numSpills;
+        incrementNumSpills();
       } finally {
         if (out != null) out.close();
         if (partitionOut != null) {
@@ -2022,7 +2032,7 @@ public class MapTask extends Task {
       if (!dst.getParentFile().exists()) {
         if (!dst.getParentFile().mkdirs()) {
           throw new IOException("Unable to rename " + src + " to "
-              + dst + ": couldn't create parent directory"); 
+              + dst + ": couldn't create parent directory");
         }
       }
       
@@ -2030,6 +2040,21 @@ public class MapTask extends Task {
         throw new IOException("Unable to rename " + src + " to " + dst);
       }
     }
+
+    /**
+     * Increments numSpills local counter by taking into consideration
+     * the max limit on spill files being generated by the job.
+     * If limit is reached, this function throws an IOException
+     */
+    private void incrementNumSpills() throws IOException {
+      ++numSpills;
+      if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE
+          && numSpills > spillFilesCountLimit) {
+        throw new IOException("Too many spill files got created, control it 
with " +
+            "mapreduce.task.spill.files.count.limit, current value: " + 
spillFilesCountLimit +
+            ", current spill count: " + numSpills);
+      }
+    }
   } // MapOutputBuffer
   
   /**
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 8ec984e777bb..289159ad922a 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -323,6 +323,7 @@ public interface MRJobConfig {
   public static final int DEFAULT_IO_SORT_MB = 100;
 
   public static final String INDEX_CACHE_MEMORY_LIMIT = 
"mapreduce.task.index.cache.limit.bytes";
+  String SPILL_FILES_COUNT_LIMIT = "mapreduce.task.spill.files.count.limit";
 
   public static final String PRESERVE_FAILED_TASK_FILES = 
"mapreduce.task.files.preserve.failedtasks";
 
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 9b0d8b563d7b..ca144a7b156c 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -62,6 +62,15 @@
   set to less than .5</description>
 </property>
 
+<property>
+  <name>mapreduce.task.spill.files.count.limit</name>
+  <value>-1</value>
+  <description>Number of spill files that can be created by a MapTask.
+  After breaching this, task will fail. Default value for this config is -1
+  which indicates that there is no limit on number of spill files being
+  created</description>
+</property>
+
 <property>
   <name>mapreduce.job.local-fs.single-disk-limit.bytes</name>
   <value>-1</value>
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapTask.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapTask.java
index d5164de46d12..fef179994f09 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapTask.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapTask.java
@@ -27,14 +27,20 @@ import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.MapTask.MapOutputBuffer;
 import org.apache.hadoop.mapred.Task.TaskReporter;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.util.Progress;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
@@ -51,6 +57,9 @@ public class TestMapTask {
     FileUtil.fullyDelete(TEST_ROOT_DIR);
   }
 
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
   // Verify output files for shuffle have group read permission even when
   // the configured umask normally would prevent it.
   @Test
@@ -84,4 +93,73 @@ public class TestMapTask {
     Assert.assertEquals("Incorrect index file perms",
         (short)0640, perms.toShort());
   }
+
+  @Test
+  public void testSpillFilesCountLimitInvalidValue() throws Exception {
+    JobConf conf = new JobConf();
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+    conf.set(MRConfig.LOCAL_DIR, TEST_ROOT_DIR.getAbsolutePath());
+    conf.setInt(MRJobConfig.SPILL_FILES_COUNT_LIMIT, -2);
+    MapOutputFile mof = new MROutputFiles();
+    mof.setConf(conf);
+    TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 
1);
+    MapTask mockTask = mock(MapTask.class);
+    doReturn(mof).when(mockTask).getMapOutputFile();
+    doReturn(attemptId).when(mockTask).getTaskID();
+    doReturn(new Progress()).when(mockTask).getSortPhase();
+    TaskReporter mockReporter = mock(TaskReporter.class);
+    doReturn(new 
Counter()).when(mockReporter).getCounter(any(TaskCounter.class));
+    MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask, 
conf, mockReporter);
+    MapOutputBuffer<Object, Object> mob = new MapOutputBuffer<>();
+
+    exception.expect(IOException.class);
+    exception.expectMessage("Invalid value for 
\"mapreduce.task.spill.files.count.limit\", " +
+        "current value: -2");
+
+    mob.init(ctx);
+    mob.close();
+  }
+
+  @Test
+  public void testSpillFilesCountBreach() throws Exception {
+    JobConf conf = new JobConf();
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+    conf.set(MRConfig.LOCAL_DIR, TEST_ROOT_DIR.getAbsolutePath());
+    conf.setInt(MRJobConfig.SPILL_FILES_COUNT_LIMIT, 2);
+    MapOutputFile mof = new MROutputFiles();
+    mof.setConf(conf);
+    TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 
1);
+    MapTask mockTask = mock(MapTask.class);
+    doReturn(mof).when(mockTask).getMapOutputFile();
+    doReturn(attemptId).when(mockTask).getTaskID();
+    doReturn(new Progress()).when(mockTask).getSortPhase();
+    TaskReporter mockReporter = mock(TaskReporter.class);
+    doReturn(new 
Counter()).when(mockReporter).getCounter(any(TaskCounter.class));
+    MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask, 
conf, mockReporter);
+    MapOutputBuffer<Object, Object> mob = new MapOutputBuffer<>();
+    mob.numSpills = 2;
+    mob.init(ctx);
+
+    Method method = mob.getClass().getDeclaredMethod("incrementNumSpills");
+    method.setAccessible(true);
+    boolean gotExceptionWithMessage = false;
+    try {
+      method.invoke(mob);
+    } catch(InvocationTargetException e) {
+      Throwable targetException = e.getTargetException();
+      if (targetException != null) {
+        String errorMessage = targetException.getMessage();
+        if (errorMessage != null) {
+          if(errorMessage.equals("Too many spill files got created, control it 
with " +
+              "mapreduce.task.spill.files.count.limit, current value: 2, 
current spill count: 3")) {
+            gotExceptionWithMessage = true;
+          }
+        }
+      }
+    }
+
+    mob.close();
+
+    Assert.assertTrue(gotExceptionWithMessage);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to