This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit f1a19b7a3f590f8b1876deb1bae6dfb4bf840edb
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Thu Nov 28 10:43:35 2019 +0800

    HDFS-14986. ReplicaCachingGetSpaceUsed throws 
ConcurrentModificationException. Contributed by Aiphago.
    
    (cherry picked from commit 2b452b4e6063072b2bec491edd3f412eb7ac21f3)
---
 .../org/apache/hadoop/fs/CachingGetSpaceUsed.java  | 34 +++++++++++--
 .../server/datanode/fsdataset/FsDatasetSpi.java    |  6 +++
 .../datanode/fsdataset/impl/FsDatasetImpl.java     | 12 ++---
 .../fsdataset/impl/ReplicaCachingGetSpaceUsed.java |  1 +
 .../impl/TestReplicaCachingGetSpaceUsed.java       | 55 ++++++++++++++++++++++
 5 files changed, 98 insertions(+), 10 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java
index 92476d7..58dc82d 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java
@@ -47,6 +47,7 @@ public abstract class CachingGetSpaceUsed implements 
Closeable, GetSpaceUsed {
   private final long jitter;
   private final String dirPath;
   private Thread refreshUsed;
+  private boolean shouldFirstRefresh;
 
   /**
    * This is the constructor used by the builder.
@@ -79,16 +80,30 @@ public abstract class CachingGetSpaceUsed implements 
Closeable, GetSpaceUsed {
     this.refreshInterval = interval;
     this.jitter = jitter;
     this.used.set(initialUsed);
+    this.shouldFirstRefresh = true;
   }
 
   void init() {
     if (used.get() < 0) {
       used.set(0);
+      if (!shouldFirstRefresh) {
+        // Skip initial refresh operation, so we need to do first refresh
+        // operation immediately in refresh thread.
+        initRefeshThread(true);
+        return;
+      }
       refresh();
     }
+    initRefeshThread(false);
+  }
 
+  /**
+   * RunImmediately should set true, if we skip the first refresh.
+   * @param runImmediately The param default should be false.
+   */
+  private void initRefeshThread (boolean runImmediately) {
     if (refreshInterval > 0) {
-      refreshUsed = new Thread(new RefreshThread(this),
+      refreshUsed = new Thread(new RefreshThread(this, runImmediately),
           "refreshUsed-" + dirPath);
       refreshUsed.setDaemon(true);
       refreshUsed.start();
@@ -101,6 +116,14 @@ public abstract class CachingGetSpaceUsed implements 
Closeable, GetSpaceUsed {
   protected abstract void refresh();
 
   /**
+   * Reset that if we need to do the first refresh.
+   * @param shouldFirstRefresh The flag value to set.
+   */
+  protected void setShouldFirstRefresh(boolean shouldFirstRefresh) {
+    this.shouldFirstRefresh = shouldFirstRefresh;
+  }
+
+  /**
    * @return an estimate of space used in the directory path.
    */
   @Override public long getUsed() throws IOException {
@@ -156,9 +179,11 @@ public abstract class CachingGetSpaceUsed implements 
Closeable, GetSpaceUsed {
   private static final class RefreshThread implements Runnable {
 
     final CachingGetSpaceUsed spaceUsed;
+    private boolean runImmediately;
 
-    RefreshThread(CachingGetSpaceUsed spaceUsed) {
+    RefreshThread(CachingGetSpaceUsed spaceUsed, boolean runImmediately) {
       this.spaceUsed = spaceUsed;
+      this.runImmediately = runImmediately;
     }
 
     @Override
@@ -176,7 +201,10 @@ public abstract class CachingGetSpaceUsed implements 
Closeable, GetSpaceUsed {
           }
           // Make sure that after the jitter we didn't end up at 0.
           refreshInterval = Math.max(refreshInterval, 1);
-          Thread.sleep(refreshInterval);
+          if (!runImmediately) {
+            Thread.sleep(refreshInterval);
+          }
+          runImmediately = false;
           // update the used variable
           spaceUsed.refresh();
         } catch (InterruptedException e) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 78a5cfc..578c390 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -661,5 +661,11 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
    */
   AutoCloseableLock acquireDatasetLock();
 
+  /**
+   * Deep copy the replica info belonging to given block pool.
+   * @param bpid Specified block pool id.
+   * @return A set of replica info.
+   * @throws IOException
+   */
   Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index ac92ae4..957b306 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -198,16 +198,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     }
   }
 
-  /**
-   * The deepCopyReplica call doesn't use the datasetock since it will lead the
-   * potential deadlock with the {@link FsVolumeList#addBlockPool} call.
-   */
   @Override
   public Set<? extends Replica> deepCopyReplica(String bpid)
       throws IOException {
-    Set<? extends Replica> replicas =
-        new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
-            : volumeMap.replicas(bpid));
+    Set<? extends Replica> replicas = null;
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.
+          EMPTY_SET : volumeMap.replicas(bpid));
+    }
     return Collections.unmodifiableSet(replicas);
   }
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaCachingGetSpaceUsed.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaCachingGetSpaceUsed.java
index 2c1c16e..5acc3c0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaCachingGetSpaceUsed.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaCachingGetSpaceUsed.java
@@ -59,6 +59,7 @@ public class ReplicaCachingGetSpaceUsed extends 
FSCachingGetSpaceUsed {
 
   public ReplicaCachingGetSpaceUsed(Builder builder) throws IOException {
     super(builder);
+    setShouldFirstRefresh(false);
     volume = builder.getVolume();
     bpid = builder.getBpid();
   }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaCachingGetSpaceUsed.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaCachingGetSpaceUsed.java
index 45a3916..6abf523 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaCachingGetSpaceUsed.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaCachingGetSpaceUsed.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import org.apache.commons.lang.math.RandomUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CachingGetSpaceUsed;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -27,8 +28,11 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.Replica;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -36,6 +40,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
+import java.util.Set;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
 import static org.junit.Assert.assertEquals;
@@ -145,4 +150,54 @@ public class TestReplicaCachingGetSpaceUsed {
 
     fs.delete(new Path("/testReplicaCachingGetSpaceUsedByRBWReplica"), true);
   }
+
+  @Test(timeout = 15000)
+  public void testFsDatasetImplDeepCopyReplica() {
+    FsDatasetSpi<?> fsDataset = dataNode.getFSDataset();
+    ModifyThread modifyThread = new ModifyThread();
+    modifyThread.start();
+    String bpid = cluster.getNamesystem(0).getBlockPoolId();
+    int retryTimes = 10;
+
+    while (retryTimes > 0) {
+      try {
+        Set<? extends Replica> replicas = fsDataset.deepCopyReplica(bpid);
+        if (replicas.size() > 0) {
+          retryTimes--;
+        }
+      } catch (IOException e) {
+        modifyThread.setShouldRun(false);
+        Assert.fail("Encounter IOException when deep copy replica.");
+      }
+    }
+    modifyThread.setShouldRun(false);
+  }
+
+  private class ModifyThread extends Thread {
+    private boolean shouldRun = true;
+
+    @Override
+    public void run() {
+      FSDataOutputStream os = null;
+      while (shouldRun) {
+        try {
+          int id = RandomUtils.nextInt();
+          os = fs.create(new Path("/testFsDatasetImplDeepCopyReplica/" + id));
+          byte[] bytes = new byte[2048];
+          InputStream is = new ByteArrayInputStream(bytes);
+          IOUtils.copyBytes(is, os, bytes.length);
+          os.hsync();
+          os.close();
+        } catch (IOException e) {}
+      }
+
+      try {
+        fs.delete(new Path("/testFsDatasetImplDeepCopyReplica"), true);
+      } catch (IOException e) {}
+    }
+
+    private void setShouldRun(boolean shouldRun) {
+      this.shouldRun = shouldRun;
+    }
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to