HDFS-8983. NameNode support for protected directories. (Contributed by Arpit 
Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bdbe53c6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bdbe53c6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bdbe53c6

Branch: refs/heads/HDFS-7285
Commit: bdbe53c676dd4ff135ea2f64d3b9193fe43d7c8e
Parents: e2c9b28
Author: Arpit Agarwal <a...@apache.org>
Authored: Sat Aug 29 09:51:55 2015 -0700
Committer: Arpit Agarwal <a...@apache.org>
Committed: Sat Aug 29 09:52:37 2015 -0700

----------------------------------------------------------------------
 .../fs/CommonConfigurationKeysPublic.java       |   7 +
 .../src/main/resources/core-default.xml         |  10 +
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../hdfs/server/namenode/FSDirDeleteOp.java     |  40 ++
 .../hdfs/server/namenode/FSDirectory.java       |  63 ++++
 .../namenode/TestProtectedDirectories.java      | 373 +++++++++++++++++++
 6 files changed, 495 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdbe53c6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
index 24d648f..f3bc2e1 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
@@ -85,6 +85,13 @@ public class CommonConfigurationKeysPublic {
   /** Default value for FS_TRASH_CHECKPOINT_INTERVAL_KEY */
   public static final long    FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT = 0;
 
+  /**
+   * Directories that cannot be removed unless empty, even by an
+   * administrator.
+   */
+  public static final String FS_PROTECTED_DIRECTORIES =
+      "fs.protected.directories";
+
   // TBD: Code is still using hardcoded values (e.g. "fs.automatic.close")
   // instead of constant (e.g. FS_AUTOMATIC_CLOSE_KEY)
   //

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdbe53c6/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index d02f0ac..cef32d3 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -565,6 +565,16 @@ for ldap providers in the same way as above does.
 </property>
 
 <property>
+  <name>fs.protected.directories</name>
+  <value></value>
+  <description>A comma-separated list of directories which cannot
+    be deleted even by the superuser unless they are empty. This
+    setting can be used to guard important system directories
+    against accidental deletion due to administrator error.
+  </description>
+</property>
+
+<property>
   <name>fs.AbstractFileSystem.file.impl</name>
   <value>org.apache.hadoop.fs.local.LocalFs</value>
   <description>The AbstractFileSystem for file: uris.</description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdbe53c6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index a561909..6f46ea5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -861,6 +861,8 @@ Release 2.8.0 - UNRELEASED
     HDFS-8925. Move BlockReaderLocal to hdfs-client.
     (Mingliang Liu via wheat9)
 
+    HDFS-8983. NameNode support for protected directories. (Arpit Agarwal)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdbe53c6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
index b0e9a5c..51d643a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
@@ -17,15 +17,19 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INode.ReclaimContext;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.ChunkedArrayList;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.SortedSet;
 
 import static 
org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
 import static org.apache.hadoop.util.Time.now;
@@ -103,6 +107,9 @@ class FSDirDeleteOp {
       fsd.checkPermission(pc, iip, false, null, FsAction.WRITE, null,
                           FsAction.ALL, true);
     }
+    if (recursive && fsd.isNonEmptyDirectory(iip)) {
+      checkProtectedDescendants(fsd, fsd.normalizePath(src));
+    }
 
     return deleteInternal(fsn, src, iip, logRetryCache);
   }
@@ -262,4 +269,37 @@ class FSDirDeleteOp {
     }
     return true;
   }
+
+  /**
+   * Throw if the given directory has any non-empty protected descendants
+   * (including itself).
+   *
+   * @param src directory whose descendants are to be checked. The caller
+   *            must ensure src is not terminated with {@link Path#SEPARATOR}.
+   * @throws AccessControlException if a non-empty protected descendant
+   *                                was found.
+   */
+  private static void checkProtectedDescendants(FSDirectory fsd, String src)
+      throws AccessControlException, UnresolvedLinkException {
+    final SortedSet<String> protectedDirs = fsd.getProtectedDirectories();
+
+    // Is src protected? Caller has already checked it is non-empty.
+    if (protectedDirs.contains(src)) {
+      throw new AccessControlException(
+          "Cannot delete non-empty protected directory " + src);
+    }
+
+    // Are any descendants of src protected?
+    // The subSet call returns only the descendants of src since
+    // {@link Path#SEPARATOR} is "/" and '0' is the next ASCII
+    // character after '/'.
+    for (String descendant :
+            protectedDirs.subSet(src + Path.SEPARATOR, src + "0")) {
+      if (fsd.isNonEmptyDirectory(fsd.getINodesInPath4Write(
+              descendant, false))) {
+        throw new AccessControlException(
+            "Cannot delete non-empty protected subdirectory " + descendant);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdbe53c6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 409174a..734d3c0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -70,9 +70,12 @@ import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.FS_PROTECTED_DIRECTORIES;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT;
@@ -137,6 +140,13 @@ public class FSDirectory implements Closeable {
 
   private final int inodeXAttrsLimit; //inode xattrs max limit
 
+  // A set of directories that have been protected using the
+  // dfs.namenode.protected.directories setting. These directories cannot
+  // be deleted unless they are empty.
+  //
+  // Each entry in this set must be a normalized path.
+  private final SortedSet<String> protectedDirectories;
+
   // lock to protect the directory and BlockMap
   private final ReentrantReadWriteLock dirLock;
 
@@ -278,6 +288,8 @@ public class FSDirectory implements Closeable {
         DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY,
         DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_DEFAULT);
 
+    this.protectedDirectories = parseProtectedDirectories(conf);
+
     Preconditions.checkArgument(this.inodeXAttrsLimit >= 0,
         "Cannot set a negative limit on the number of xattrs per inode (%s).",
         DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY);
@@ -305,6 +317,25 @@ public class FSDirectory implements Closeable {
     return namesystem;
   }
 
+  /**
+   * Parse configuration setting dfs.namenode.protected.directories to
+   * retrieve the set of protected directories.
+   *
+   * @param conf
+   * @return a TreeSet
+   */
+  @VisibleForTesting
+  static SortedSet<String> parseProtectedDirectories(Configuration conf) {
+    // Normalize each input path to guard against administrator error.
+    return new TreeSet<>(normalizePaths(
+        conf.getTrimmedStringCollection(FS_PROTECTED_DIRECTORIES),
+        FS_PROTECTED_DIRECTORIES));
+  }
+
+  SortedSet<String> getProtectedDirectories() {
+    return protectedDirectories;
+  }
+
   BlockManager getBlockManager() {
     return getFSNamesystem().getBlockManager();
   }
@@ -905,6 +936,38 @@ public class FSDirectory implements Closeable {
         && INodeReference.tryRemoveReference(last) > 0) ? 0 : 1;
   }
 
+  /**
+   * Return a new collection of normalized paths from the given input
+   * collection. The input collection is unmodified.
+   *
+   * Reserved paths, relative paths and paths with scheme are ignored.
+   *
+   * @param paths collection whose contents are to be normalized.
+   * @return collection with all input paths normalized.
+   */
+  static Collection<String> normalizePaths(Collection<String> paths,
+                                           String errorString) {
+    if (paths.isEmpty()) {
+      return paths;
+    }
+    final Collection<String> normalized = new ArrayList<>(paths.size());
+    for (String dir : paths) {
+      if (isReservedName(dir)) {
+        LOG.error("{} ignoring reserved path {}", errorString, dir);
+      } else {
+        final Path path = new Path(dir);
+        if (!path.isAbsolute()) {
+          LOG.error("{} ignoring relative path {}", errorString, dir);
+        } else if (path.toUri().getScheme() != null) {
+          LOG.error("{} ignoring path {} with scheme", errorString, dir);
+        } else {
+          normalized.add(path.toString());
+        }
+      }
+    }
+    return normalized;
+  }
+
   static String normalizePath(String src) {
     if (src.length() > 1 && src.endsWith("/")) {
       src = src.substring(0, src.length() - 1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdbe53c6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProtectedDirectories.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProtectedDirectories.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProtectedDirectories.java
new file mode 100644
index 0000000..be7b686
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProtectedDirectories.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.AccessControlException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Verify that the dfs.namenode.protected.directories setting is respected.
+ */
+public class TestProtectedDirectories {
+  static final Logger LOG = LoggerFactory.getLogger(
+      TestProtectedDirectories.class);
+
+  @Rule
+  public Timeout timeout = new Timeout(300000);
+
+  /**
+   * Start a namenode-only 'cluster' which is configured to protect
+   * the given list of directories.
+   * @param conf
+   * @param protectedDirs
+   * @param unProtectedDirs
+   * @return
+   * @throws IOException
+   */
+  public MiniDFSCluster setupTestCase(Configuration conf,
+                                      Collection<Path> protectedDirs,
+                                      Collection<Path> unProtectedDirs)
+      throws Throwable {
+    // Initialize the configuration.
+    conf.set(
+        CommonConfigurationKeys.FS_PROTECTED_DIRECTORIES,
+        Joiner.on(",").skipNulls().join(protectedDirs));
+
+    // Start the cluster.
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+
+    // Create all the directories.
+    try {
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      for (Path path : Iterables.concat(protectedDirs, unProtectedDirs)) {
+        fs.mkdirs(path);
+      }
+      return cluster;
+    } catch (Throwable t) {
+      cluster.shutdown();
+      throw t;
+    }
+  }
+
+  /**
+   * Initialize a collection of file system layouts that will be used
+   * as the test matrix.
+   *
+   * @return
+   */
+  private Collection<TestMatrixEntry> createTestMatrix() {
+    Collection<TestMatrixEntry> matrix = new ArrayList<TestMatrixEntry>();
+
+    // single empty unprotected dir.
+    matrix.add(TestMatrixEntry.get()
+        .addUnprotectedDir("/1", true));
+
+    // Single empty protected dir.
+    matrix.add(TestMatrixEntry.get()
+        .addProtectedDir("/1", true));
+
+    // Nested unprotected dirs.
+    matrix.add(TestMatrixEntry.get()
+        .addUnprotectedDir("/1", true)
+        .addUnprotectedDir("/1/2", true)
+        .addUnprotectedDir("/1/2/3", true)
+        .addUnprotectedDir("/1/2/3/4", true));
+
+    // Non-empty protected dir.
+    matrix.add(TestMatrixEntry.get()
+        .addProtectedDir("/1", false)
+        .addUnprotectedDir("/1/2", true));
+
+    // Protected empty child of unprotected parent.
+    matrix.add(TestMatrixEntry.get()
+        .addProtectedDir("/1/2", true)
+        .addUnprotectedDir("/1/2", true));
+
+    // Protected empty child of protected parent.
+    // We should not be able to delete the parent.
+    matrix.add(TestMatrixEntry.get()
+        .addProtectedDir("/1", false)
+        .addProtectedDir("/1/2", true));
+
+    // One of each, non-nested.
+    matrix.add(TestMatrixEntry.get()
+        .addProtectedDir("/1", true)
+        .addUnprotectedDir("/a", true));
+
+    // Protected non-empty child of unprotected parent.
+    // Neither should be deletable.
+    matrix.add(TestMatrixEntry.get()
+        .addProtectedDir("/1/2", false)
+        .addUnprotectedDir("/1/2/3", true)
+        .addUnprotectedDir("/1", false));
+
+    // Protected non-empty child has unprotected siblings.
+    matrix.add(TestMatrixEntry.get()
+        .addProtectedDir("/1/2.2", false)
+        .addUnprotectedDir("/1/2.2/3", true)
+        .addUnprotectedDir("/1/2.1", true)
+        .addUnprotectedDir("/1/2.3", true)
+        .addUnprotectedDir("/1", false));
+
+    // Deeply nested protected child.
+    matrix.add(TestMatrixEntry.get()
+        .addProtectedDir("/1/2/3/4/5", false)
+        .addUnprotectedDir("/1/2/3/4/5/6", true)
+        .addUnprotectedDir("/1", false)
+        .addUnprotectedDir("/1/2", false)
+        .addUnprotectedDir("/1/2/3", false)
+        .addUnprotectedDir("/1/2/3/4", false));
+
+    // Disjoint trees.
+    matrix.add(TestMatrixEntry.get()
+        .addProtectedDir("/1/2", false)
+        .addProtectedDir("/a/b", false)
+        .addUnprotectedDir("/1/2/3", true)
+        .addUnprotectedDir("/a/b/c", true));
+
+    // The following tests exercise special cases in the path prefix
+    // checks and handling of trailing separators.
+
+    // A disjoint non-empty protected dir has the same string prefix as the
+    // directory we are trying to delete.
+    matrix.add(TestMatrixEntry.get()
+        .addProtectedDir("/a1", false)
+        .addUnprotectedDir("/a1/a2", true)
+        .addUnprotectedDir("/a", true));
+
+    // The directory we are trying to delete has a non-empty protected
+    // child and we try to delete it with a trailing separator.
+    matrix.add(TestMatrixEntry.get()
+        .addProtectedDir("/a/b", false)
+        .addUnprotectedDir("/a/b/c", true)
+        .addUnprotectedDir("/a/", false));
+
+    // The directory we are trying to delete has an empty protected
+    // child and we try to delete it with a trailing separator.
+    matrix.add(TestMatrixEntry.get()
+        .addProtectedDir("/a/b", true)
+        .addUnprotectedDir("/a/", true));
+
+    return matrix;
+  }
+
+  @Test
+  public void testAll() throws Throwable {
+    for (TestMatrixEntry testMatrixEntry : createTestMatrix()) {
+      Configuration conf = new HdfsConfiguration();
+      MiniDFSCluster cluster = setupTestCase(
+          conf, testMatrixEntry.getProtectedPaths(),
+          testMatrixEntry.getUnprotectedPaths());
+
+      try {
+        LOG.info("Running {}", testMatrixEntry);
+        FileSystem fs = cluster.getFileSystem();
+        for (Path path : testMatrixEntry.getAllPathsToBeDeleted()) {
+          final long countBefore = cluster.getNamesystem().getFilesTotal();
+          assertThat(
+              testMatrixEntry + ": Testing whether " + path + " can be 
deleted",
+              deletePath(fs, path),
+              is(testMatrixEntry.canPathBeDeleted(path)));
+          final long countAfter = cluster.getNamesystem().getFilesTotal();
+
+          if (!testMatrixEntry.canPathBeDeleted(path)) {
+            assertThat(
+                "Either all paths should be deleted or none",
+                countAfter, is(countBefore));
+          }
+        }
+      } finally {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Verify that configured paths are normalized by removing
+   * redundant separators.
+   */
+  @Test
+  public void testProtectedDirNormalization1() {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(
+        CommonConfigurationKeys.FS_PROTECTED_DIRECTORIES,
+        "/foo//bar");
+    Collection<String> paths = FSDirectory.parseProtectedDirectories(conf);
+    assertThat(paths.size(), is(1));
+    assertThat(paths.iterator().next(), is("/foo/bar"));
+  }
+
+  /**
+   * Verify that configured paths are normalized by removing
+   * trailing separators.
+   */
+  @Test
+  public void testProtectedDirNormalization2() {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(
+        CommonConfigurationKeys.FS_PROTECTED_DIRECTORIES,
+        "/a/b/,/c,/d/e/f/");
+    Collection<String> paths = FSDirectory.parseProtectedDirectories(conf);
+
+    for (String path : paths) {
+      assertFalse(path.endsWith("/"));
+    }
+  }
+
+  /**
+   * Verify that configured paths are canonicalized.
+   */
+  @Test
+  public void testProtectedDirIsCanonicalized() {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(
+        CommonConfigurationKeys.FS_PROTECTED_DIRECTORIES,
+        "/foo/../bar/");
+    Collection<String> paths = FSDirectory.parseProtectedDirectories(conf);
+    assertThat(paths.size(), is(1));
+    assertThat(paths.iterator().next(), is("/bar"));   
+  }
+
+  /**
+   * Verify that the root directory in the configuration is correctly handled.
+   */
+  @Test
+  public void testProtectedRootDirectory() {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(
+        CommonConfigurationKeys.FS_PROTECTED_DIRECTORIES, "/");
+    Collection<String> paths = FSDirectory.parseProtectedDirectories(conf);
+    assertThat(paths.size(), is(1));
+    assertThat(paths.iterator().next(), is("/"));
+  }
+
+  /**
+   * Verify that invalid paths in the configuration are filtered out.
+   * (Path with scheme, reserved path).
+   */
+  @Test
+  public void testBadPathsInConfig() {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(
+        CommonConfigurationKeys.FS_PROTECTED_DIRECTORIES,
+        "hdfs://foo/,/.reserved/foo");
+    Collection<String> paths = FSDirectory.parseProtectedDirectories(conf);
+    assertThat("Unexpected directories " + paths,
+        paths.size(), is(0));
+  }
+
+  /**
+   * Return true if the path was successfully deleted. False if it
+   * failed with AccessControlException. Any other exceptions are
+   * propagated to the caller.
+   *
+   * @param fs
+   * @param path
+   * @return
+   */
+  private boolean deletePath(FileSystem fs, Path path) throws IOException {
+    try {
+      fs.delete(path, true);
+      return true;
+    } catch (AccessControlException ace) {
+      return false;
+    }
+  }
+
+  private static class TestMatrixEntry {
+    // true if the path can be deleted.
+    final Map<Path, Boolean> protectedPaths = Maps.newHashMap();
+    final Map<Path, Boolean> unProtectedPaths = Maps.newHashMap();
+
+    private TestMatrixEntry() {
+    }
+
+    public static TestMatrixEntry get() {
+      return new TestMatrixEntry();
+    }
+
+    public Collection<Path> getProtectedPaths() {
+      return protectedPaths.keySet();
+    }
+
+    public Collection<Path> getUnprotectedPaths() {
+      return unProtectedPaths.keySet();
+    }
+
+    /**
+     * Get all paths to be deleted in sorted order.
+     * @return sorted collection of paths to be deleted.
+     */
+    @SuppressWarnings("unchecked") // Path implements Comparable incorrectly
+    public Iterable<Path> getAllPathsToBeDeleted() {
+      // Sorting ensures deletion of parents is attempted first.
+      ArrayList<Path> combined = new ArrayList<>();
+      combined.addAll(protectedPaths.keySet());
+      combined.addAll(unProtectedPaths.keySet());
+      Collections.sort(combined);
+      return combined;
+    }
+
+    public boolean canPathBeDeleted(Path path) {
+      return protectedPaths.containsKey(path) ?
+          protectedPaths.get(path) : unProtectedPaths.get(path);
+    }
+
+
+    public TestMatrixEntry addProtectedDir(String dir, boolean canBeDeleted) {
+      protectedPaths.put(new Path(dir), canBeDeleted);
+      return this;
+    }
+
+    public TestMatrixEntry addUnprotectedDir(String dir, boolean canBeDeleted) 
{
+      unProtectedPaths.put(new Path(dir), canBeDeleted);
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      return "TestMatrixEntry - ProtectedPaths=[" +
+          Joiner.on(", ").join(protectedPaths.keySet()) +
+          "]; UnprotectedPaths=[" +
+          Joiner.on(", ").join(unProtectedPaths.keySet()) + "]";
+    }
+  }
+}

Reply via email to