HDFS-13328. Abstract ReencryptionHandler recursive logic in separate class. 
Contributed by Surendra Singh Lilhore.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f89594f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f89594f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f89594f0

Branch: refs/heads/HDFS-7240
Commit: f89594f0b80e8efffdcb887daa4a18a2b0a228b3
Parents: cef8eb7
Author: Rakesh Radhakrishnan <rake...@apache.org>
Authored: Tue Apr 10 23:35:00 2018 +0530
Committer: Rakesh Radhakrishnan <rake...@apache.org>
Committed: Tue Apr 10 23:35:00 2018 +0530

----------------------------------------------------------------------
 .../hdfs/server/namenode/FSTreeTraverser.java   | 339 ++++++++++
 .../server/namenode/ReencryptionHandler.java    | 615 ++++++++-----------
 .../server/namenode/ReencryptionUpdater.java    |   2 +-
 .../hdfs/server/namenode/TestReencryption.java  |   3 -
 .../namenode/TestReencryptionHandler.java       |  10 +-
 5 files changed, 595 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89594f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
new file mode 100644
index 0000000..ff77029
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * FSTreeTraverser traverse directory recursively and process files
+ * in batches.
+ */
+@InterfaceAudience.Private
+public abstract class FSTreeTraverser {
+
+
+  public static final Logger LOG = LoggerFactory
+      .getLogger(FSTreeTraverser.class);
+
+  private final FSDirectory dir;
+
+  private long readLockReportingThresholdMs;
+
+  private Timer timer;
+
+  public FSTreeTraverser(FSDirectory dir, Configuration conf) {
+    this.dir = dir;
+    this.readLockReportingThresholdMs = conf.getLong(
+        DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
+        DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT);
+    timer = new Timer();
+  }
+
+  public FSDirectory getFSDirectory() {
+    return dir;
+  }
+
+  /**
+   * Iterate through all files directly inside parent, and recurse down
+   * directories. The listing is done in batch, and can optionally start after
+   * a position. The iteration of the inode tree is done in a depth-first
+   * fashion. But instead of holding all {@link INodeDirectory}'s in memory
+   * on the fly, only the path components to the current inode is held. This
+   * is to reduce memory consumption.
+   *
+   * @param parent
+   *          The inode id of parent directory
+   * @param startId
+   *          Id of the start inode.
+   * @param startAfter
+   *          Full path of a file the traverse should start after.
+   * @param traverseInfo
+   *          info which may required for processing the child's.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  protected void traverseDir(final INodeDirectory parent, final long startId,
+      byte[] startAfter, final TraverseInfo traverseInfo)
+      throws IOException, InterruptedException {
+    List<byte[]> startAfters = new ArrayList<>();
+    if (parent == null) {
+      return;
+    }
+    INode curr = parent;
+    // construct startAfters all the way up to the zone inode.
+    startAfters.add(startAfter);
+    while (curr.getId() != startId) {
+      startAfters.add(0, curr.getLocalNameBytes());
+      curr = curr.getParent();
+    }
+    curr = traverseDirInt(startId, parent, startAfters, traverseInfo);
+    while (!startAfters.isEmpty()) {
+      if (curr == null) {
+        // lock was reacquired, re-resolve path.
+        curr = resolvePaths(startId, startAfters);
+      }
+      curr = traverseDirInt(startId, curr, startAfters, traverseInfo);
+    }
+  }
+
+  /**
+   * Iterates the parent directory, and add direct children files to current
+   * batch. If batch size meets configured threshold, current batch will be
+   * submitted for the processing.
+   * <p>
+   * Locks could be released and reacquired when a batch submission is
+   * finished.
+   *
+   * @param startId
+   *          Id of the start inode.
+   * @return The inode which was just processed, if lock is held in the entire
+   *         process. Null if lock is released.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  protected INode traverseDirInt(final long startId, INode curr,
+      List<byte[]> startAfters, final TraverseInfo traverseInfo)
+      throws IOException, InterruptedException {
+    assert dir.hasReadLock();
+    assert dir.getFSNamesystem().hasReadLock();
+    long lockStartTime = timer.monotonicNow();
+    Preconditions.checkNotNull(curr, "Current inode can't be null");
+    checkINodeReady(startId);
+    final INodeDirectory parent = curr.isDirectory() ? curr.asDirectory()
+        : curr.getParent();
+    ReadOnlyList<INode> children = parent
+        .getChildrenList(Snapshot.CURRENT_STATE_ID);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Traversing directory {}", parent.getFullPathName());
+    }
+
+    final byte[] startAfter = startAfters.get(startAfters.size() - 1);
+    boolean lockReleased = false;
+    for (int i = INodeDirectory.nextChild(children, startAfter); i < children
+        .size(); ++i) {
+      final INode inode = children.get(i);
+      if (!processFileInode(inode, traverseInfo)) {
+        // inode wasn't processes. Recurse down if it's a dir,
+        // skip otherwise.
+        if (!inode.isDirectory()) {
+          continue;
+        }
+
+        if (!canTraverseDir(inode)) {
+          continue;
+        }
+        // add 1 level to the depth-first search.
+        curr = inode;
+        if (!startAfters.isEmpty()) {
+          startAfters.remove(startAfters.size() - 1);
+          startAfters.add(curr.getLocalNameBytes());
+        }
+        startAfters.add(HdfsFileStatus.EMPTY_NAME);
+        return lockReleased ? null : curr;
+      }
+      if (shouldSubmitCurrentBatch()) {
+        final byte[] currentStartAfter = inode.getLocalNameBytes();
+        final String parentPath = parent.getFullPathName();
+        lockReleased = true;
+        readUnlock();
+        submitCurrentBatch(startId);
+        try {
+          throttle();
+          checkPauseForTesting();
+        } finally {
+          readLock();
+          lockStartTime = timer.monotonicNow();
+        }
+        checkINodeReady(startId);
+
+        // Things could have changed when the lock was released.
+        // Re-resolve the parent inode.
+        FSPermissionChecker pc = dir.getPermissionChecker();
+        INode newParent = dir
+            .resolvePath(pc, parentPath, FSDirectory.DirOp.READ)
+            .getLastINode();
+        if (newParent == null || !newParent.equals(parent)) {
+          // parent dir is deleted or recreated. We're done.
+          return null;
+        }
+        children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
+        // -1 to counter the ++ on the for loop
+        i = INodeDirectory.nextChild(children, currentStartAfter) - 1;
+      }
+      if ((timer.monotonicNow()
+          - lockStartTime) > readLockReportingThresholdMs) {
+        readUnlock();
+        try {
+          throttle();
+        } finally {
+          readLock();
+          lockStartTime = timer.monotonicNow();
+        }
+      }
+    }
+    // Successfully finished this dir, adjust pointers to 1 level up, and
+    // startAfter this dir.
+    startAfters.remove(startAfters.size() - 1);
+    if (!startAfters.isEmpty()) {
+      startAfters.remove(startAfters.size() - 1);
+      startAfters.add(curr.getLocalNameBytes());
+    }
+    curr = curr.getParent();
+    return lockReleased ? null : curr;
+  }
+
+  /**
+   * Resolve the cursor of traverse to an inode.
+   * <p>
+   * The parent of the lowest level startAfter is returned. If somewhere in the
+   * middle of startAfters changed, the parent of the lowest unchanged level is
+   * returned.
+   *
+   * @param startId
+   *          Id of the start inode.
+   * @param startAfters
+   *          the cursor, represented by a list of path bytes.
+   * @return the parent inode corresponding to the startAfters, or null if the
+   *         furthest parent is deleted.
+   */
+  private INode resolvePaths(final long startId, List<byte[]> startAfters)
+      throws IOException {
+    // If the readlock was reacquired, we need to resolve the paths again
+    // in case things have changed. If our cursor file/dir is changed,
+    // continue from the next one.
+    INode zoneNode = dir.getInode(startId);
+    if (zoneNode == null) {
+      throw new FileNotFoundException("Zone " + startId + " is deleted.");
+    }
+    INodeDirectory parent = zoneNode.asDirectory();
+    for (int i = 0; i < startAfters.size(); ++i) {
+      if (i == startAfters.size() - 1) {
+        // last startAfter does not need to be resolved, since search for
+        // nextChild will cover that automatically.
+        break;
+      }
+      INode curr = parent.getChild(startAfters.get(i),
+          Snapshot.CURRENT_STATE_ID);
+      if (curr == null) {
+        // inode at this level has changed. Update startAfters to point to
+        // the next dir at the parent level (and dropping any startAfters
+        // at lower levels).
+        for (; i < startAfters.size(); ++i) {
+          startAfters.remove(startAfters.size() - 1);
+        }
+        break;
+      }
+      parent = curr.asDirectory();
+    }
+    return parent;
+  }
+
+  protected void readLock() {
+    dir.getFSNamesystem().readLock();
+    dir.readLock();
+  }
+
+  protected void readUnlock() {
+    dir.readUnlock();
+    dir.getFSNamesystem().readUnlock("FSTreeTraverser");
+  }
+
+
+  protected abstract void checkPauseForTesting() throws InterruptedException;
+
+  /**
+   * Process an Inode. Add to current batch if it's a file, no-op otherwise.
+   *
+   * @param inode
+   *          the inode
+   * @return true if inode is added to currentBatch and should be process for
+   *         next operation. false otherwise: could be inode is not a file.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  protected abstract boolean processFileInode(INode inode,
+      TraverseInfo traverseInfo) throws IOException, InterruptedException;
+
+  /**
+   * Check whether current batch can be submitted for the processing.
+   *
+   * @return true if batch size meets meet the condition, otherwise false.
+   */
+  protected abstract boolean shouldSubmitCurrentBatch();
+
+  /**
+   * Check whether inode is ready for traverse. Throws IOE if it's not.
+   *
+   * @param startId
+   *          Id of the start inode.
+   * @throws IOException
+   */
+  protected abstract void checkINodeReady(long startId) throws IOException;
+
+  /**
+   * Submit the current batch for processing.
+   *
+   * @param startId
+   *          Id of the start inode.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  protected abstract void submitCurrentBatch(long startId)
+      throws IOException, InterruptedException;
+
+  /**
+   * Throttles the FSTreeTraverser.
+   *
+   * @throws InterruptedException
+   */
+  protected abstract void throttle() throws InterruptedException;
+
+  /**
+   * Check whether dir is traversable or not.
+   *
+   * @param inode
+   *          Dir inode
+   * @return true if dir is traversable otherwise false.
+   * @throws IOException
+   */
+  protected abstract boolean canTraverseDir(INode inode) throws IOException;
+
+  /**
+   * Class will represent the additional info required for traverse.
+   */
+  public static class TraverseInfo {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89594f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
index 65de397..5b52c82 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
@@ -29,18 +29,16 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
 import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo;
 import 
org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask;
 import 
org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker;
-import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
@@ -117,6 +115,8 @@ public class ReencryptionHandler implements Runnable {
   // be single-threaded, see class javadoc for more details.
   private ReencryptionBatch currentBatch;
 
+  private final ReencryptionPendingInodeIdCollector traverser;
+
   private final ReencryptionUpdater reencryptionUpdater;
   private ExecutorService updaterExecutor;
 
@@ -185,16 +185,6 @@ public class ReencryptionHandler implements Runnable {
     reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count);
   }
 
-  private synchronized void checkPauseForTesting() throws InterruptedException 
{
-    assert !dir.hasReadLock();
-    assert !dir.getFSNamesystem().hasReadLock();
-    while (shouldPauseForTesting) {
-      LOG.info("Sleeping in the re-encrypt handler for unit test.");
-      wait();
-      LOG.info("Continuing re-encrypt handler after pausing.");
-    }
-  }
-
   ReencryptionHandler(final EncryptionZoneManager ezMgr,
       final Configuration conf) {
     this.ezManager = ezMgr;
@@ -255,6 +245,7 @@ public class ReencryptionHandler implements Runnable {
     reencryptionUpdater =
         new ReencryptionUpdater(dir, batchService, this, conf);
     currentBatch = new ReencryptionBatch(reencryptBatchSize);
+    traverser = new ReencryptionPendingInodeIdCollector(dir, this, conf);
   }
 
   ReencryptionStatus getReencryptionStatus() {
@@ -338,7 +329,7 @@ public class ReencryptionHandler implements Runnable {
         synchronized (this) {
           wait(interval);
         }
-        checkPauseForTesting();
+        traverser.checkPauseForTesting();
       } catch (InterruptedException ie) {
         LOG.info("Re-encrypt handler interrupted. Exiting");
         Thread.currentThread().interrupt();
@@ -396,7 +387,7 @@ public class ReencryptionHandler implements Runnable {
     final INode zoneNode;
     final ZoneReencryptionStatus zs;
 
-    readLock();
+    traverser.readLock();
     try {
       zoneNode = dir.getInode(zoneId);
       // start re-encrypting the zone from the beginning
@@ -418,18 +409,19 @@ public class ReencryptionHandler implements Runnable {
           zoneId);
       if (zs.getLastCheckpointFile() == null) {
         // new re-encryption
-        reencryptDir(zoneNode.asDirectory(), zoneId, HdfsFileStatus.EMPTY_NAME,
-            zs.getEzKeyVersionName());
+        traverser.traverseDir(zoneNode.asDirectory(), zoneId,
+            HdfsFileStatus.EMPTY_NAME,
+            new ZoneTraverseInfo(zs.getEzKeyVersionName()));
       } else {
         // resuming from a past re-encryption
         restoreFromLastProcessedFile(zoneId, zs);
       }
       // save the last batch and mark complete
-      submitCurrentBatch(zoneId);
+      traverser.submitCurrentBatch(zoneId);
       LOG.info("Submission completed of zone {} for re-encryption.", zoneId);
       reencryptionUpdater.markZoneSubmissionDone(zoneId);
     } finally {
-      readUnlock();
+      traverser.readUnlock();
     }
   }
 
@@ -478,131 +470,8 @@ public class ReencryptionHandler implements Runnable {
         dir.getINodesInPath(zs.getLastCheckpointFile(), 
FSDirectory.DirOp.READ);
     parent = lpfIIP.getLastINode().getParent();
     startAfter = lpfIIP.getLastINode().getLocalNameBytes();
-    reencryptDir(parent, zoneId, startAfter, zs.getEzKeyVersionName());
-  }
-
-  /**
-   * Iterate through all files directly inside parent, and recurse down
-   * directories. The listing is done in batch, and can optionally start after
-   * a position.
-   * <p>
-   * Each batch is then send to the threadpool, where KMS will be contacted and
-   * edek re-encrypted. {@link ReencryptionUpdater} handles the tasks completed
-   * from the threadpool.
-   * <p>
-   * The iteration of the inode tree is done in a depth-first fashion. But
-   * instead of holding all INodeDirectory's in memory on the fly, only the
-   * path components to the current inode is held. This is to reduce memory
-   * consumption.
-   *
-   * @param parent     The inode id of parent directory
-   * @param zoneId     Id of the EZ inode
-   * @param startAfter Full path of a file the re-encrypt should start after.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private void reencryptDir(final INodeDirectory parent, final long zoneId,
-      byte[] startAfter, final String ezKeyVerName)
-      throws IOException, InterruptedException {
-    List<byte[]> startAfters = new ArrayList<>();
-    if (parent == null) {
-      return;
-    }
-    INode curr = parent;
-    // construct startAfters all the way up to the zone inode.
-    startAfters.add(startAfter);
-    while (curr.getId() != zoneId) {
-      startAfters.add(0, curr.getLocalNameBytes());
-      curr = curr.getParent();
-    }
-    curr = reencryptDirInt(zoneId, parent, startAfters, ezKeyVerName);
-    while (!startAfters.isEmpty()) {
-      if (curr == null) {
-        // lock was reacquired, re-resolve path.
-        curr = resolvePaths(zoneId, startAfters);
-      }
-      curr = reencryptDirInt(zoneId, curr, startAfters, ezKeyVerName);
-    }
-  }
-
-  /**
-   * Resolve the cursor of re-encryption to an inode.
-   * <p>
-   * The parent of the lowest level startAfter is returned. If somewhere in the
-   * middle of startAfters changed, the parent of the lowest unchanged level is
-   * returned.
-   *
-   * @param zoneId      Id of the EZ inode.
-   * @param startAfters the cursor, represented by a list of path bytes.
-   * @return the parent inode corresponding to the startAfters, or null if
-   * the EZ node (furthest parent) is deleted.
-   */
-  private INode resolvePaths(final long zoneId, List<byte[]> startAfters)
-      throws IOException {
-    // If the readlock was reacquired, we need to resolve the paths again
-    // in case things have changed. If our cursor file/dir is changed,
-    // continue from the next one.
-    INode zoneNode = dir.getInode(zoneId);
-    if (zoneNode == null) {
-      throw new FileNotFoundException("Zone " + zoneId + " is deleted.");
-    }
-    INodeDirectory parent = zoneNode.asDirectory();
-    for (int i = 0; i < startAfters.size(); ++i) {
-      if (i == startAfters.size() - 1) {
-        // last startAfter does not need to be resolved, since search for
-        // nextChild will cover that automatically.
-        break;
-      }
-      INode curr =
-          parent.getChild(startAfters.get(i), Snapshot.CURRENT_STATE_ID);
-      if (curr == null) {
-        // inode at this level has changed. Update startAfters to point to
-        // the next dir at the parent level (and dropping any startAfters
-        // at lower levels).
-        for (; i < startAfters.size(); ++i) {
-          startAfters.remove(startAfters.size() - 1);
-        }
-        break;
-      }
-      parent = curr.asDirectory();
-    }
-    return parent;
-  }
-
-  /**
-   * Submit the current batch to the thread pool.
-   *
-   * @param zoneId Id of the EZ INode
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private void submitCurrentBatch(final long zoneId)
-      throws IOException, InterruptedException {
-    assert dir.hasReadLock();
-    if (currentBatch.isEmpty()) {
-      return;
-    }
-    ZoneSubmissionTracker zst;
-    synchronized (this) {
-      zst = submissions.get(zoneId);
-      if (zst == null) {
-        zst = new ZoneSubmissionTracker();
-        submissions.put(zoneId, zst);
-      }
-    }
-    Future future = batchService
-        .submit(new EDEKReencryptCallable(zoneId, currentBatch, this));
-    zst.addTask(future);
-    LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
-        currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
-    currentBatch = new ReencryptionBatch(reencryptBatchSize);
-    // flip the pause flag if this is nth submission.
-    // The actual pause need to happen outside of the lock.
-    if (pauseAfterNthSubmission > 0) {
-      if (--pauseAfterNthSubmission == 0) {
-        shouldPauseForTesting = true;
-      }
-    }
+    traverser.traverseDir(parent, zoneId, startAfter,
+        new ZoneTraverseInfo(zs.getEzKeyVersionName()));
   }
 
   final class ReencryptionBatch {
@@ -710,256 +579,268 @@ public class ReencryptionHandler implements Runnable {
     }
   }
 
+
   /**
-   * Iterates the parent directory, and add direct children files to
-   * current batch. If batch size meets configured threshold, a Callable
-   * is created and sent to the thread pool, which will communicate to the KMS
-   * to get new edeks.
-   * <p>
-   * Locks could be released and reacquired when a Callable is created.
-   *
-   * @param zoneId Id of the EZ INode
-   * @return The inode which was just processed, if lock is held in the entire
-   * process. Null if lock is released.
-   * @throws IOException
-   * @throws InterruptedException
+   * Called when a new zone is submitted for re-encryption. This will interrupt
+   * the background thread if it's waiting for the next
+   * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY.
    */
-  private INode reencryptDirInt(final long zoneId, INode curr,
-      List<byte[]> startAfters, final String ezKeyVerName)
-      throws IOException, InterruptedException {
-    assert dir.hasReadLock();
-    assert dir.getFSNamesystem().hasReadLock();
-    Preconditions.checkNotNull(curr, "Current inode can't be null");
-    checkZoneReady(zoneId);
-    final INodeDirectory parent =
-        curr.isDirectory() ? curr.asDirectory() : curr.getParent();
-    ReadOnlyList<INode> children =
-        parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Re-encrypting directory {}", parent.getFullPathName());
-    }
-
-    final byte[] startAfter = startAfters.get(startAfters.size() - 1);
-    boolean lockReleased = false;
-    for (int i = INodeDirectory.nextChild(children, startAfter);
-         i < children.size(); ++i) {
-      final INode inode = children.get(i);
-      if (!reencryptINode(inode, ezKeyVerName)) {
-        // inode wasn't added for re-encryption. Recurse down if it's a dir,
-        // skip otherwise.
-        if (!inode.isDirectory()) {
-          continue;
-        }
-        if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
-          // nested EZ, ignore.
-          LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
-              inode.getFullPathName(), inode.getId());
-          continue;
+  synchronized void notifyNewSubmission() {
+    LOG.debug("Notifying handler for new re-encryption command.");
+    this.notify();
+  }
+
+  public ReencryptionPendingInodeIdCollector getTraverser() {
+    return traverser;
+  }
+
+  /**
+   * ReencryptionPendingInodeIdCollector which throttle based on configured
+   * throttle ratio.
+   */
+  class ReencryptionPendingInodeIdCollector extends FSTreeTraverser {
+
+    private final ReencryptionHandler reencryptionHandler;
+
+    ReencryptionPendingInodeIdCollector(FSDirectory dir,
+        ReencryptionHandler rHandler, Configuration conf) {
+      super(dir, conf);
+      this.reencryptionHandler = rHandler;
+    }
+
+    @Override
+    protected void checkPauseForTesting()
+        throws InterruptedException {
+      assert !dir.hasReadLock();
+      assert !dir.getFSNamesystem().hasReadLock();
+      while (shouldPauseForTesting) {
+        LOG.info("Sleeping in the re-encrypt handler for unit test.");
+        synchronized (reencryptionHandler) {
+          reencryptionHandler.wait(30000);
         }
-        // add 1 level to the depth-first search.
-        curr = inode;
-        if (!startAfters.isEmpty()) {
-          startAfters.remove(startAfters.size() - 1);
-          startAfters.add(curr.getLocalNameBytes());
+        LOG.info("Continuing re-encrypt handler after pausing.");
+      }
+    }
+
+    /**
+     * Process an Inode for re-encryption. Add to current batch if it's a file,
+     * no-op otherwise.
+     *
+     * @param inode
+     *          the inode
+     * @return true if inode is added to currentBatch and should be
+     *         re-encrypted. false otherwise: could be inode is not a file, or
+     *         inode's edek's key version is not changed.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Override
+    public boolean processFileInode(INode inode, TraverseInfo traverseInfo)
+        throws IOException, InterruptedException {
+      assert dir.hasReadLock();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
+      }
+      if (!inode.isFile()) {
+        return false;
+      }
+      FileEncryptionInfo feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(
+          dir, INodesInPath.fromINode(inode));
+      if (feInfo == null) {
+        LOG.warn("File {} skipped re-encryption because it is not encrypted! "
+            + "This is very likely a bug.", inode.getId());
+        return false;
+      }
+      if (traverseInfo instanceof ZoneTraverseInfo
+          && ((ZoneTraverseInfo) traverseInfo).getEzKeyVerName().equals(
+              feInfo.getEzKeyVersionName())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("File {} skipped re-encryption because edek's key version"
+              + " name is not changed.", inode.getFullPathName());
         }
-        startAfters.add(HdfsFileStatus.EMPTY_NAME);
-        return lockReleased ? null : curr;
+        return false;
+      }
+      currentBatch.add(inode.asFile());
+      return true;
+    }
+
+    /**
+     * Check whether zone is ready for re-encryption. Throws IOE if it's not. 
1.
+     * If EZ is deleted. 2. if the re-encryption is canceled. 3. If NN is not
+     * active or is in safe mode.
+     *
+     * @throws IOException
+     *           if zone does not exist / is cancelled, or if NN is not ready
+     *           for write.
+     */
+    @Override
+    protected void checkINodeReady(long zoneId) throws IOException {
+      final ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus(
+          zoneId);
+      if (zs == null) {
+        throw new IOException("Zone " + zoneId + " status cannot be found.");
+      }
+      if (zs.isCanceled()) {
+        throw new IOException("Re-encryption is canceled for zone " + zoneId);
       }
-      if (currentBatch.size() >= reencryptBatchSize) {
-        final byte[] currentStartAfter = inode.getLocalNameBytes();
-        final String parentPath = parent.getFullPathName();
-        submitCurrentBatch(zoneId);
-        lockReleased = true;
-        readUnlock();
-        try {
-          throttle();
-          checkPauseForTesting();
-        } finally {
-          readLock();
+      dir.getFSNamesystem().checkNameNodeSafeMode(
+          "NN is in safe mode, cannot re-encrypt.");
+      // re-encryption should be cancelled when NN goes to standby. Just
+      // double checking for sanity.
+      dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
+    }
+
+    /**
+     * Submit the current batch to the thread pool.
+     *
+     * @param zoneId
+     *          Id of the EZ INode
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Override
+    protected void submitCurrentBatch(final long zoneId) throws IOException,
+        InterruptedException {
+      if (currentBatch.isEmpty()) {
+        return;
+      }
+      ZoneSubmissionTracker zst;
+      synchronized (ReencryptionHandler.this) {
+        zst = submissions.get(zoneId);
+        if (zst == null) {
+          zst = new ZoneSubmissionTracker();
+          submissions.put(zoneId, zst);
         }
-        checkZoneReady(zoneId);
-
-        // Things could have changed when the lock was released.
-        // Re-resolve the parent inode.
-        FSPermissionChecker pc = dir.getPermissionChecker();
-        INode newParent =
-            dir.resolvePath(pc, parentPath, FSDirectory.DirOp.READ)
-                .getLastINode();
-        if (newParent == null || !newParent.equals(parent)) {
-          // parent dir is deleted or recreated. We're done.
-          return null;
+      }
+      Future future = batchService.submit(new EDEKReencryptCallable(zoneId,
+          currentBatch, reencryptionHandler));
+      zst.addTask(future);
+      LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
+          currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
+      currentBatch = new ReencryptionBatch(reencryptBatchSize);
+      // flip the pause flag if this is nth submission.
+      // The actual pause need to happen outside of the lock.
+      if (pauseAfterNthSubmission > 0) {
+        if (--pauseAfterNthSubmission == 0) {
+          shouldPauseForTesting = true;
         }
-        children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
-        // -1 to counter the ++ on the for loop
-        i = INodeDirectory.nextChild(children, currentStartAfter) - 1;
       }
     }
-    // Successfully finished this dir, adjust pointers to 1 level up, and
-    // startAfter this dir.
-    startAfters.remove(startAfters.size() - 1);
-    if (!startAfters.isEmpty()) {
-      startAfters.remove(startAfters.size() - 1);
-      startAfters.add(curr.getLocalNameBytes());
-    }
-    curr = curr.getParent();
-    return lockReleased ? null : curr;
-  }
 
-  private void readLock() {
-    dir.getFSNamesystem().readLock();
-    dir.readLock();
-    throttleTimerLocked.start();
-  }
+    /**
+     * Throttles the ReencryptionHandler in 3 aspects:
+     * 1. Prevents generating more Callables than the CPU could possibly
+     * handle.
+     * 2. Prevents generating more Callables than the ReencryptionUpdater
+     * can handle, under its own throttling.
+     * 3. Prevents contending FSN/FSD read locks. This is done based
+     * on the DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
+     * <p>
+     * Item 1 and 2 are to control NN heap usage.
+     *
+     * @throws InterruptedException
+     */
+    @VisibleForTesting
+    @Override
+    protected void throttle() throws InterruptedException {
+      assert !dir.hasReadLock();
+      assert !dir.getFSNamesystem().hasReadLock();
+      final int numCores = Runtime.getRuntime().availableProcessors();
+      if (taskQueue.size() >= numCores) {
+        LOG.debug("Re-encryption handler throttling because queue size {} is"
+            + "larger than number of cores {}", taskQueue.size(), numCores);
+        while (taskQueue.size() >= numCores) {
+          Thread.sleep(100);
+        }
+      }
 
-  private void readUnlock() {
-    dir.readUnlock();
-    dir.getFSNamesystem().readUnlock("reencryptHandler");
-    throttleTimerLocked.stop();
-  }
+      // 2. if tasks are piling up on the updater, don't create new callables
+      // until the queue size goes down.
+      final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
+      int numTasks = numTasksSubmitted();
+      if (numTasks >= maxTasksPiled) {
+        LOG.debug("Re-encryption handler throttling because total tasks 
pending"
+            + " re-encryption updater is {}", numTasks);
+        while (numTasks >= maxTasksPiled) {
+          Thread.sleep(500);
+          numTasks = numTasksSubmitted();
+        }
+      }
 
-  /**
-   * Throttles the ReencryptionHandler in 3 aspects:
-   * 1. Prevents generating more Callables than the CPU could possibly handle.
-   * 2. Prevents generating more Callables than the ReencryptionUpdater can
-   *   handle, under its own throttling
-   * 3. Prevents contending FSN/FSD read locks. This is done based on the
-   *   DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
-   * <p>
-   * Item 1 and 2 are to control NN heap usage.
-   *
-   * @throws InterruptedException
-   */
-  @VisibleForTesting
-  void throttle() throws InterruptedException {
-    // 1.
-    final int numCores = Runtime.getRuntime().availableProcessors();
-    if (taskQueue.size() >= numCores) {
-      LOG.debug("Re-encryption handler throttling because queue size {} is"
-          + "larger than number of cores {}", taskQueue.size(), numCores);
-      while (taskQueue.size() >= numCores) {
-        Thread.sleep(100);
+      // 3.
+      if (throttleLimitHandlerRatio >= 1.0) {
+        return;
+      }
+      final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
+          * throttleLimitHandlerRatio);
+      final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
+            + " throttleTimerAll:{}", expect, actual,
+            throttleTimerAll.now(TimeUnit.MILLISECONDS));
       }
+      if (expect - actual < 0) {
+        // in case throttleLimitHandlerRatio is very small, expect will be 0.
+        // so sleepMs should not be calculated from expect, to really meet the
+        // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
+        // should be 1000 - throttleTimerAll.now()
+        final long sleepMs = (long) (actual / throttleLimitHandlerRatio)
+            - throttleTimerAll.now(TimeUnit.MILLISECONDS);
+        LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
+        Thread.sleep(sleepMs);
+      }
+      throttleTimerAll.reset().start();
+      throttleTimerLocked.reset();
     }
 
-    // 2. if tasks are piling up on the updater, don't create new callables
-    // until the queue size goes down.
-    final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
-    int numTasks = numTasksSubmitted();
-    if (numTasks >= maxTasksPiled) {
-      LOG.debug("Re-encryption handler throttling because total tasks pending"
-          + " re-encryption updater is {}", numTasks);
-      while (numTasks >= maxTasksPiled) {
-        Thread.sleep(500);
-        numTasks = numTasksSubmitted();
+    private int numTasksSubmitted() {
+      int ret = 0;
+      synchronized (ReencryptionHandler.this) {
+        for (ZoneSubmissionTracker zst : submissions.values()) {
+          ret += zst.getTasks().size();
+        }
       }
+      return ret;
     }
 
-    // 3.
-    if (throttleLimitHandlerRatio >= 1.0) {
-      return;
-    }
-    final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
-        * throttleLimitHandlerRatio);
-    final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
-              + " throttleTimerAll:{}", expect, actual,
-          throttleTimerAll.now(TimeUnit.MILLISECONDS));
-    }
-    if (expect - actual < 0) {
-      // in case throttleLimitHandlerRatio is very small, expect will be 0.
-      // so sleepMs should not be calculated from expect, to really meet the
-      // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
-      // should be 1000 - throttleTimerAll.now()
-      final long sleepMs =
-          (long) (actual / throttleLimitHandlerRatio) - throttleTimerAll
-              .now(TimeUnit.MILLISECONDS);
-      LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
-      Thread.sleep(sleepMs);
+    @Override
+    public boolean shouldSubmitCurrentBatch() {
+      return currentBatch.size() >= reencryptBatchSize;
     }
-    throttleTimerAll.reset().start();
-    throttleTimerLocked.reset();
-  }
 
-  private synchronized int numTasksSubmitted() {
-    int ret = 0;
-    for (ZoneSubmissionTracker zst : submissions.values()) {
-      ret += zst.getTasks().size();
+    @Override
+    public boolean canTraverseDir(INode inode) throws IOException {
+      if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
+        // nested EZ, ignore.
+        LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
+            inode.getFullPathName(), inode.getId());
+        return false;
+      }
+      return true;
     }
-    return ret;
-  }
 
-  /**
-   * Process an Inode for re-encryption. Add to current batch if it's a file,
-   * no-op otherwise.
-   *
-   * @param inode the inode
-   * @return true if inode is added to currentBatch and should be re-encrypted.
-   * false otherwise: could be inode is not a file, or inode's edek's
-   * key version is not changed.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private boolean reencryptINode(final INode inode, final String ezKeyVerName)
-      throws IOException, InterruptedException {
-    assert dir.hasReadLock();
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
-    }
-    if (!inode.isFile()) {
-      return false;
-    }
-    FileEncryptionInfo feInfo = FSDirEncryptionZoneOp
-        .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
-    if (feInfo == null) {
-      LOG.warn("File {} skipped re-encryption because it is not encrypted! "
-          + "This is very likely a bug.", inode.getId());
-      return false;
+    @Override
+    protected void readLock() {
+      super.readLock();
+      throttleTimerLocked.start();
     }
-    if (ezKeyVerName.equals(feInfo.getEzKeyVersionName())) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("File {} skipped re-encryption because edek's key version"
-            + " name is not changed.", inode.getFullPathName());
-      }
-      return false;
+
+    @Override
+    protected void readUnlock() {
+      super.readUnlock();
+      throttleTimerLocked.stop();
     }
-    currentBatch.add(inode.asFile());
-    return true;
   }
 
-  /**
-   * Check whether zone is ready for re-encryption. Throws IOE if it's not.
-   * 1. If EZ is deleted.
-   * 2. if the re-encryption is canceled.
-   * 3. If NN is not active or is in safe mode.
-   *
-   * @throws IOException if zone does not exist / is cancelled, or if NN is not
-   *                     ready for write.
-   */
-  void checkZoneReady(final long zoneId)
-      throws RetriableException, SafeModeException, IOException {
-    final ZoneReencryptionStatus zs =
-        getReencryptionStatus().getZoneStatus(zoneId);
-    if (zs == null) {
-      throw new IOException("Zone " + zoneId + " status cannot be found.");
-    }
-    if (zs.isCanceled()) {
-      throw new IOException("Re-encryption is canceled for zone " + zoneId);
+  private class ZoneTraverseInfo extends TraverseInfo {
+    private String ezKeyVerName;
+
+    ZoneTraverseInfo(String ezKeyVerName) {
+      this.ezKeyVerName = ezKeyVerName;
     }
-    dir.getFSNamesystem()
-        .checkNameNodeSafeMode("NN is in safe mode, cannot re-encrypt.");
-    // re-encryption should be cancelled when NN goes to standby. Just
-    // double checking for sanity.
-    dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
-  }
 
-  /**
-   * Called when a new zone is submitted for re-encryption. This will interrupt
-   * the background thread if it's waiting for the next
-   * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY.
-   */
-  synchronized void notifyNewSubmission() {
-    LOG.debug("Notifying handler for new re-encryption command.");
-    this.notify();
+    public String getEzKeyVerName() {
+      return ezKeyVerName;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89594f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
index 3b7badb..a5923a7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
@@ -464,7 +464,7 @@ public final class ReencryptionUpdater implements Runnable {
     final String zonePath;
     dir.writeLock();
     try {
-      handler.checkZoneReady(task.zoneId);
+      handler.getTraverser().checkINodeReady(task.zoneId);
       final INode zoneNode = dir.getInode(task.zoneId);
       if (zoneNode == null) {
         // ez removed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89594f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
index f0d6834..7685f31 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Supplier;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -64,7 +63,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-
 import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -72,7 +70,6 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import org.junit.rules.Timeout;
 import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f89594f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
index e2035ed..3481b42 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
@@ -75,6 +75,10 @@ public class TestReencryptionHandler {
         CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
     Mockito.when(ezm.getProvider()).thenReturn(
         KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp));
+    FSDirectory fsd = Mockito.mock(FSDirectory.class);
+    FSNamesystem fns = Mockito.mock(FSNamesystem.class);
+    Mockito.when(fsd.getFSNamesystem()).thenReturn(fns);
+    Mockito.when(ezm.getFSDirectory()).thenReturn(fsd);
     return new ReencryptionHandler(ezm, conf);
   }
 
@@ -99,7 +103,7 @@ public class TestReencryptionHandler {
     Whitebox.setInternalState(rh, "throttleTimerLocked", mockLocked);
     Whitebox.setInternalState(rh, "taskQueue", queue);
     final StopWatch sw = new StopWatch().start();
-    rh.throttle();
+    rh.getTraverser().throttle();
     sw.stop();
     assertTrue("should have throttled for at least 8 second",
         sw.now(TimeUnit.MILLISECONDS) > 8000);
@@ -130,7 +134,7 @@ public class TestReencryptionHandler {
         submissions = new HashMap<>();
     Whitebox.setInternalState(rh, "submissions", submissions);
     StopWatch sw = new StopWatch().start();
-    rh.throttle();
+    rh.getTraverser().throttle();
     sw.stop();
     assertTrue("should not have throttled",
         sw.now(TimeUnit.MILLISECONDS) < 1000);
@@ -189,7 +193,7 @@ public class TestReencryptionHandler {
     Whitebox.setInternalState(rh, "submissions", submissions);
     final StopWatch sw = new StopWatch().start();
     removeTaskThread.start();
-    rh.throttle();
+    rh.getTraverser().throttle();
     sw.stop();
     LOG.info("Throttle completed, consumed {}", sw.now(TimeUnit.MILLISECONDS));
     assertTrue("should have throttled for at least 3 second",


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to