MAPREDUCE-7148. Fast fail jobs when exceeds dfs quota limitation. Contributed 
by Wang Yan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0b6625a9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0b6625a9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0b6625a9

Branch: refs/heads/HDFS-13891
Commit: 0b6625a9735f76ab473b41d8ab9b7f3c7678cfff
Parents: 8dc1f6d
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Nov 7 08:20:49 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Nov 7 08:20:49 2018 -0600

----------------------------------------------------------------------
 ...ClusterStorageCapacityExceededException.java |  51 ++++++++
 .../hdfs/protocol/QuotaExceededException.java   |   5 +-
 .../org/apache/hadoop/mapred/YarnChild.java     |  28 ++++-
 .../org/apache/hadoop/mapred/TestYarnChild.java | 118 +++++++++++++++++++
 .../apache/hadoop/mapreduce/MRJobConfig.java    |   3 +
 .../src/main/resources/mapred-default.xml       |   9 ++
 6 files changed, 209 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b6625a9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClusterStorageCapacityExceededException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClusterStorageCapacityExceededException.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClusterStorageCapacityExceededException.java
new file mode 100644
index 0000000..bbbf073
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ClusterStorageCapacityExceededException.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Exception raised by HDFS indicating that storage capacity in the
+ * cluster filesystem is exceeded. See also
+ * https://issues.apache.org/jira/browse/MAPREDUCE-7148.
+ */
+@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
+@InterfaceStability.Evolving
+public class ClusterStorageCapacityExceededException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  public ClusterStorageCapacityExceededException() {
+    super();
+  }
+
+  public ClusterStorageCapacityExceededException(String message) {
+    super(message);
+  }
+
+  public ClusterStorageCapacityExceededException(String message,
+      Throwable cause) {
+    super(message, cause);
+  }
+
+  public ClusterStorageCapacityExceededException(Throwable cause) {
+    super(cause);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b6625a9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java
index f4e7f34..7033f3f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java
@@ -18,10 +18,9 @@
 
 package org.apache.hadoop.hdfs.protocol;
 
-import java.io.IOException;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
 
 /**
  * This exception is thrown when modification to HDFS results in violation
@@ -37,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceStability;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class QuotaExceededException extends IOException {
+public class QuotaExceededException extends 
ClusterStorageCapacityExceededException {
   protected static final long serialVersionUID = 1L;
   protected String pathName=null;
   protected long quota; // quota

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b6625a9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
index bd40e54..e81b090 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
@@ -26,10 +26,12 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.CallerContext;
@@ -61,6 +63,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * The main() for MapReduce task processes.
  */
@@ -205,8 +209,7 @@ class YarnChild {
       // Report back any failures, for diagnostic purposes
       if (taskid != null) {
         if (!ShutdownHookManager.get().isShutdownInProgress()) {
-          umbilical.fatalError(taskid,
-              StringUtils.stringifyException(exception), false);
+          reportError(exception, task, umbilical);
         }
       }
     } catch (Throwable throwable) {
@@ -228,6 +231,27 @@ class YarnChild {
     }
   }
 
+  @VisibleForTesting
+  static void reportError(Exception exception, Task task,
+      TaskUmbilicalProtocol umbilical) throws IOException {
+    boolean fastFailJob = false;
+    boolean hasClusterStorageCapacityExceededException =
+        ExceptionUtils.indexOfType(exception,
+            ClusterStorageCapacityExceededException.class) != -1;
+    if (hasClusterStorageCapacityExceededException) {
+      boolean killJobWhenExceedClusterStorageCapacity = task.getConf()
+          .getBoolean(MRJobConfig.JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED,
+              MRJobConfig.DEFAULT_JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED);
+      if (killJobWhenExceedClusterStorageCapacity) {
+        LOG.error(
+            "Fast fail the job because the cluster storage capacity was 
exceeded.");
+        fastFailJob = true;
+      }
+    }
+    umbilical.fatalError(taskid, StringUtils.stringifyException(exception),
+        fastFailJob);
+  }
+
   /**
    * Utility method to check if the Encrypted Spill Key needs to be set into 
the
    * user credentials of the user running the Map / Reduce Task

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b6625a9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestYarnChild.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestYarnChild.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestYarnChild.java
new file mode 100644
index 0000000..404b1cb
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestYarnChild.java
@@ -0,0 +1,118 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests the behavior of YarnChild.
+ */
+public class TestYarnChild {
+  private Task task;
+  private TaskUmbilicalProtocol umbilical;
+  private Configuration conf;
+  final static private String KILL_LIMIT_EXCEED_CONF_NAME =
+      "mapreduce.job.dfs.storage.capacity.kill-limit-exceed";
+
+  @Before
+  public void setUp() throws Exception {
+    task = mock(Task.class);
+    umbilical = mock(TaskUmbilicalProtocol.class);
+    conf = new Configuration();
+    when(task.getConf()).thenReturn(conf);
+  }
+
+  @Test
+  public void testReportErrorWhenCapacityExceptionNotHappenByDefault()
+      throws IOException {
+    Exception exception = new RuntimeException(new IOException());
+
+    verifyReportError(exception, false);
+  }
+
+  @Test
+  public void 
testReportErrorWhenCapacityExceptionNotHappenAndFastFailDisabled()
+      throws IOException {
+    Exception exception = new RuntimeException(new IOException());
+    conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, false);
+
+    verifyReportError(exception, false);
+  }
+
+  @Test
+  public void testReportErrorWhenCapacityExceptionNotHappenAndFastFailEnabled()
+      throws IOException {
+    Exception exception = new RuntimeException(new IOException());
+    conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, true);
+
+    verifyReportError(exception, false);
+  }
+
+  @Test
+  public void testReportErrorWhenCapacityExceptionHappenByDefault()
+      throws IOException {
+    Exception exception =
+        new RuntimeException(new ClusterStorageCapacityExceededException());
+
+    verifyReportError(exception, false);
+  }
+
+  @Test
+  public void testReportErrorWhenCapacityExceptionHappenAndFastFailDisabled()
+      throws IOException {
+    Exception exception =
+        new RuntimeException(new ClusterStorageCapacityExceededException());
+    conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, false);
+
+    verifyReportError(exception, false);
+  }
+
+  @Test
+  public void testReportErrorWhenCapacityExceptionHappenAndFastFailEnabled()
+      throws IOException {
+    Exception exception =
+        new RuntimeException(new ClusterStorageCapacityExceededException());
+    conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, true);
+
+    verifyReportError(exception, true);
+  }
+
+  @Test
+  public void 
testReportErrorWhenCapacityExceptionHappenInThirdOfExceptionChain()
+      throws IOException {
+    Exception exception = new RuntimeException(new IllegalStateException(
+        new ClusterStorageCapacityExceededException()));
+    conf.setBoolean(KILL_LIMIT_EXCEED_CONF_NAME, true);
+
+    verifyReportError(exception, true);
+  }
+
+  private void verifyReportError(Exception exception, boolean fastFail)
+      throws IOException {
+    YarnChild.reportError(exception, task, umbilical);
+    verify(umbilical).fatalError(any(TaskAttemptID.class), anyString(),
+        eq(fastFail));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b6625a9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 3592b3d..565c052 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -57,6 +57,9 @@ public interface MRJobConfig {
   // negative values disable the limit
   public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES = -1;
 
+  public static final String JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED =
+      "mapreduce.job.dfs.storage.capacity.kill-limit-exceed";
+  public static final boolean 
DEFAULT_JOB_DFS_STORAGE_CAPACITY_KILL_LIMIT_EXCEED = false;
   public static final String JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED =
       "mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed";
   // setting to false only logs the kill

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b6625a9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index ccc9c3d..c993537 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -85,6 +85,15 @@
 </property>
 
 <property>
+  <name>mapreduce.job.dfs.storage.capacity.kill-limit-exceed</name>
+  <value>false</value>
+  <description>Whether to fast fail the task when exceeds allocated storage
+    capacity in the cluster filesystem(ClusterStorageCapacityExceededException
+    happens), for example, exceeds the dfs quota limitation. If true, the
+    task will fast fail. If false, the task will get retried.</description>
+</property>
+
+<property>
   <name>mapreduce.job.maps</name>
   <value>2</value>
   <description>The default number of map tasks per job.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to