HDFS-10713. Throttle FsNameSystem lock warnings. Contributed by Hanisha Koneru.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a349c546
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a349c546
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a349c546

Branch: refs/heads/branch-2.8
Commit: a349c54603c5cb7327d3bbb22285bd8e686c4790
Parents: 5bc297c
Author: Arpit Agarwal <a...@apache.org>
Authored: Tue Sep 27 09:00:39 2016 -0700
Committer: Arpit Agarwal <a...@apache.org>
Committed: Tue Sep 27 09:05:53 2016 -0700

----------------------------------------------------------------------
 .../hdfs/server/namenode/FSNamesystem.java      | 102 ++++++++++++++++---
 .../hdfs/server/namenode/TestFSNamesystem.java  |  85 ++++++++++++----
 2 files changed, 151 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a349c546/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index c3f206e..614962ae 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -76,6 +76,8 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPOR
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT;
@@ -136,6 +138,8 @@ import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -293,6 +297,7 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Timer;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.log4j.Appender;
 import org.apache.log4j.AsyncAppender;
@@ -729,6 +734,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     fsLock = new FSNamesystemLock(fair);
     cond = fsLock.writeLock().newCondition();
     cpLock = new ReentrantLock();
+    setTimer(new Timer());
 
     this.fsImage = fsImage;
     try {
@@ -845,6 +851,10 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
           DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
           DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT);
 
+      this.lockSuppressWarningInterval = conf.getTimeDuration(
+          DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+          DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+
       // For testing purposes, allow the DT secret manager to be started 
regardless
       // of whether security is enabled.
       alwaysUseDelegationTokensForTests = conf.getBoolean(
@@ -1509,12 +1519,20 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     return Util.stringCollectionAsURIs(dirNames);
   }
 
+  private final long lockSuppressWarningInterval;
   /** Threshold (ms) for long holding write lock report. */
-  private long writeLockReportingThreshold;
+  private final long writeLockReportingThreshold;
+  private int numWriteLockWarningsSuppressed = 0;
+  private long timeStampOfLastWriteLockReport = 0;
+  private long longestWriteLockHeldInterval = 0;
   /** Last time stamp for write lock. Keep the longest one for 
multi-entrance.*/
   private long writeLockHeldTimeStamp;
   /** Threshold (ms) for long holding read lock report. */
   private long readLockReportingThreshold;
+  private AtomicInteger numReadLockWarningsSuppressed = new AtomicInteger(0);
+  private AtomicLong timeStampOfLastReadLockReport = new AtomicLong(0);
+  private AtomicLong longestReadLockHeldInterval = new AtomicLong(0);
+  private Timer timer;
   /**
    * Last time stamp for read lock. Keep the longest one for
    * multi-entrance. This is ThreadLocal since there could be
@@ -1532,48 +1550,99 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
   public void readLock() {
     this.fsLock.readLock().lock();
     if (this.fsLock.getReadHoldCount() == 1) {
-      readLockHeldTimeStamp.set(monotonicNow());
+      readLockHeldTimeStamp.set(timer.monotonicNow());
     }
   }
   @Override
   public void readUnlock() {
     final boolean needReport = this.fsLock.getReadHoldCount() == 1;
-    final long readLockInterval = monotonicNow() - readLockHeldTimeStamp.get();
-    this.fsLock.readLock().unlock();
-
+    final long readLockInterval = timer.monotonicNow() -
+        readLockHeldTimeStamp.get();
     if (needReport) {
       readLockHeldTimeStamp.remove();
-      if (readLockInterval > this.readLockReportingThreshold) {
-        LOG.info("FSNamesystem read lock held for " + readLockInterval +
-            " ms via\n" + StringUtils.getStackTrace(Thread.currentThread()));
-      }
+    }
+
+    this.fsLock.readLock().unlock();
+
+    if (needReport && readLockInterval >= this.readLockReportingThreshold) {
+      long localLongestReadLock;
+      do {
+        localLongestReadLock = longestReadLockHeldInterval.get();
+      } while (localLongestReadLock - readLockInterval < 0
+          && !longestReadLockHeldInterval.compareAndSet(localLongestReadLock,
+                                                        readLockInterval));
+
+      long localTimeStampOfLastReadLockReport;
+      long now;
+      do {
+        now = timer.monotonicNow();
+        localTimeStampOfLastReadLockReport = timeStampOfLastReadLockReport
+            .get();
+        if (now - localTimeStampOfLastReadLockReport <
+            lockSuppressWarningInterval) {
+          numReadLockWarningsSuppressed.incrementAndGet();
+          return;
+        }
+      } while (!timeStampOfLastReadLockReport.compareAndSet(
+          localTimeStampOfLastReadLockReport, now));
+      int numSuppressedWarnings = numReadLockWarningsSuppressed.getAndSet(0);
+      long longestLockHeldInterval = longestReadLockHeldInterval.getAndSet(0);
+      LOG.info("FSNamesystem read lock held for " + readLockInterval +
+          " ms via\n" + StringUtils.getStackTrace(Thread.currentThread()) +
+          "\tNumber of suppressed read-lock reports: " +
+          numSuppressedWarnings + "\n\tLongest read-lock held interval: " +
+          longestLockHeldInterval);
     }
   }
   @Override
   public void writeLock() {
     this.fsLock.writeLock().lock();
     if (fsLock.getWriteHoldCount() == 1) {
-      writeLockHeldTimeStamp = monotonicNow();
+      writeLockHeldTimeStamp = timer.monotonicNow();
     }
   }
   @Override
   public void writeLockInterruptibly() throws InterruptedException {
     this.fsLock.writeLock().lockInterruptibly();
     if (fsLock.getWriteHoldCount() == 1) {
-      writeLockHeldTimeStamp = monotonicNow();
+      writeLockHeldTimeStamp = timer.monotonicNow();
     }
   }
   @Override
   public void writeUnlock() {
     final boolean needReport = fsLock.getWriteHoldCount() == 1 &&
         fsLock.isWriteLockedByCurrentThread();
-    final long writeLockInterval = monotonicNow() - writeLockHeldTimeStamp;
+    final long currentTime = timer.monotonicNow();
+    final long writeLockInterval = currentTime - writeLockHeldTimeStamp;
+
+    boolean logReport = false;
+    int numSuppressedWarnings = 0;
+    long longestLockHeldInterval = 0;
+    if (needReport && writeLockInterval >= this.writeLockReportingThreshold) {
+      if (writeLockInterval > longestWriteLockHeldInterval) {
+        longestWriteLockHeldInterval = writeLockInterval;
+      }
+      if (currentTime - timeStampOfLastWriteLockReport > this
+          .lockSuppressWarningInterval) {
+        logReport = true;
+        numSuppressedWarnings = numWriteLockWarningsSuppressed;
+        numWriteLockWarningsSuppressed = 0;
+        longestLockHeldInterval = longestWriteLockHeldInterval;
+        longestWriteLockHeldInterval = 0;
+        timeStampOfLastWriteLockReport = currentTime;
+      } else {
+        numWriteLockWarningsSuppressed++;
+      }
+    }
 
     this.fsLock.writeLock().unlock();
 
-    if (needReport && writeLockInterval >= this.writeLockReportingThreshold) {
+    if (logReport) {
       LOG.info("FSNamesystem write lock held for " + writeLockInterval +
-          " ms via\n" + StringUtils.getStackTrace(Thread.currentThread()));
+          " ms via\n" + StringUtils.getStackTrace(Thread.currentThread()) +
+          "\tNumber of suppressed write-lock reports: " +
+          numSuppressedWarnings + "\n\tLongest write-lock held interval: " +
+              longestLockHeldInterval);
     }
   }
   @Override
@@ -7636,5 +7705,10 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     newSafemode.enter();
     this.safeMode = newSafemode;
   }
+
+  @VisibleForTesting
+  void setTimer(Timer newTimer) {
+    this.timer = newTimer;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a349c546/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java
index dedf2ac..6c202f4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
+import org.apache.hadoop.util.FakeTimer;
 import static org.hamcrest.CoreMatchers.either;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.*;
@@ -28,7 +29,6 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Collection;
 
 import com.google.common.base.Supplier;
@@ -59,8 +59,8 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 public class TestFSNamesystem {
@@ -296,45 +296,54 @@ public class TestFSNamesystem {
   @Test(timeout=45000)
   public void testFSWriteLockLongHoldingReport() throws Exception {
     final long writeLockReportingThreshold = 100L;
+    final long writeLockSuppressWarningInterval = 10000L;
     Configuration conf = new Configuration();
     
conf.setLong(DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
         writeLockReportingThreshold);
+    conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+        writeLockSuppressWarningInterval, TimeUnit.MILLISECONDS);
     FSImage fsImage = Mockito.mock(FSImage.class);
     FSEditLog fsEditLog = Mockito.mock(FSEditLog.class);
     Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
     final FSNamesystem fsn = new FSNamesystem(conf, fsImage);
 
+    FakeTimer timer = new FakeTimer();
+    fsn.setTimer(timer);
+    timer.advance(writeLockSuppressWarningInterval);
+
     LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG);
     GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO);
 
     // Don't report if the write lock is held for a short time
     fsn.writeLock();
-    Thread.sleep(writeLockReportingThreshold / 2);
     fsn.writeUnlock();
     assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
 
-
-    // Report if the write lock is held for a long time
+    // Report the first write lock warning if it is held for a long time
     fsn.writeLock();
-    Thread.sleep(writeLockReportingThreshold + 10);
+    timer.advance(writeLockReportingThreshold + 10);
     logs.clearOutput();
     fsn.writeUnlock();
     assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()));
 
-    // Report if the write lock is held (interruptibly) for a long time
+    // Track but do not Report if the write lock is held (interruptibly) for
+    // a long time but time since last report does not exceed the suppress
+    // warning interval
     fsn.writeLockInterruptibly();
-    Thread.sleep(writeLockReportingThreshold + 10);
+    timer.advance(writeLockReportingThreshold + 10);
     logs.clearOutput();
     fsn.writeUnlock();
-    assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()));
+    assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
 
-    // Report if it's held for a long time when re-entering write lock
+    // Track but do not Report if it's held for a long time when re-entering
+    // write lock but time since last report does not exceed the suppress
+    // warning interval
     fsn.writeLock();
-    Thread.sleep(writeLockReportingThreshold/ 2 + 1);
+    timer.advance(writeLockReportingThreshold/ 2 + 1);
     fsn.writeLockInterruptibly();
-    Thread.sleep(writeLockReportingThreshold / 2 + 1);
+    timer.advance(writeLockReportingThreshold/ 2 + 1);
     fsn.writeLock();
-    Thread.sleep(writeLockReportingThreshold / 2);
+    timer.advance(writeLockReportingThreshold/ 2);
     logs.clearOutput();
     fsn.writeUnlock();
     assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
@@ -343,7 +352,18 @@ public class TestFSNamesystem {
     assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
     logs.clearOutput();
     fsn.writeUnlock();
+    assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()));
+
+    // Report if it's held for a long time and time since last report exceeds
+    // the supress warning interval
+    timer.advance(writeLockSuppressWarningInterval);
+    fsn.writeLock();
+    timer.advance(writeLockReportingThreshold + 100);
+    logs.clearOutput();
+    fsn.writeUnlock();
     assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()));
+    assertTrue(logs.getOutput().contains("Number of suppressed write-lock " +
+        "reports: 2"));
   }
 
   /**
@@ -353,52 +373,71 @@ public class TestFSNamesystem {
   @Test(timeout=45000)
   public void testFSReadLockLongHoldingReport() throws Exception {
     final long readLockReportingThreshold = 100L;
+    final long readLockSuppressWarningInterval = 10000L;
     final String readLockLogStmt = "FSNamesystem read lock held for ";
     Configuration conf = new Configuration();
     conf.setLong(
         DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
         readLockReportingThreshold);
+    conf.setTimeDuration(DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+        readLockSuppressWarningInterval, TimeUnit.MILLISECONDS);
     FSImage fsImage = Mockito.mock(FSImage.class);
     FSEditLog fsEditLog = Mockito.mock(FSEditLog.class);
     Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
     final FSNamesystem fsn = new FSNamesystem(conf, fsImage);
 
+    final FakeTimer timer = new FakeTimer();
+    fsn.setTimer(timer);
+    timer.advance(readLockSuppressWarningInterval);
+
     LogCapturer logs = LogCapturer.captureLogs(FSNamesystem.LOG);
     GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.INFO);
 
     // Don't report if the read lock is held for a short time
     fsn.readLock();
-    Thread.sleep(readLockReportingThreshold / 2);
     fsn.readUnlock();
     assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
         logs.getOutput().contains(readLockLogStmt));
 
-    // Report if the read lock is held for a long time
+    // Report the first read lock warning if it is held for a long time
     fsn.readLock();
-    Thread.sleep(readLockReportingThreshold + 10);
+    timer.advance(readLockReportingThreshold + 10);
     logs.clearOutput();
     fsn.readUnlock();
     assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName())
         && logs.getOutput().contains(readLockLogStmt));
 
-    // Report if it's held for a long time when re-entering read lock
+    // Track but do not Report if the write lock is held for a long time but
+    // time since last report does not exceed the suppress warning interval
+    fsn.readLock();
+    timer.advance(readLockReportingThreshold + 10);
+    logs.clearOutput();
+    fsn.readUnlock();
+    assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName())
+        && logs.getOutput().contains(readLockLogStmt));
+
+    // Track but do not Report if it's held for a long time when re-entering
+    // read lock but time since last report does not exceed the suppress
+    // warning interval
     fsn.readLock();
-    Thread.sleep(readLockReportingThreshold / 2 + 1);
+    timer.advance(readLockReportingThreshold / 2 + 1);
     fsn.readLock();
-    Thread.sleep(readLockReportingThreshold / 2 + 1);
+    timer.advance(readLockReportingThreshold / 2 + 1);
     logs.clearOutput();
     fsn.readUnlock();
     assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) ||
         logs.getOutput().contains(readLockLogStmt));
     logs.clearOutput();
     fsn.readUnlock();
-    assertTrue(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
+    assertFalse(logs.getOutput().contains(GenericTestUtils.getMethodName()) &&
         logs.getOutput().contains(readLockLogStmt));
 
-    // Report if it's held for a long time while another thread also has the
+    // Report if it's held for a long time (and time since last report
+    // exceeds the suppress warning interval) while another thread also has the
     // read lock. Let one thread hold the lock long enough to activate an
     // alert, then have another thread grab the read lock to ensure that this
     // doesn't reset the timing.
+    timer.advance(readLockSuppressWarningInterval);
     logs.clearOutput();
     final CountDownLatch barrier = new CountDownLatch(1);
     final CountDownLatch barrier2 = new CountDownLatch(1);
@@ -407,7 +446,7 @@ public class TestFSNamesystem {
       public void run() {
         try {
           fsn.readLock();
-          Thread.sleep(readLockReportingThreshold + 1);
+          timer.advance(readLockReportingThreshold + 1);
           barrier.countDown(); // Allow for t2 to acquire the read lock
           barrier2.await(); // Wait until t2 has the read lock
           fsn.readUnlock();
@@ -442,6 +481,8 @@ public class TestFSNamesystem {
     Pattern t2Pattern = Pattern.compile(
         String.format(stackTracePatternString, t2.getClass().getName()));
     assertFalse(t2Pattern.matcher(logs.getOutput()).find());
+    assertTrue(logs.getOutput().contains("Number of suppressed read-lock " +
+        "reports: 2"));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to