This is an automated email from the ASF dual-hosted git repository.

shahrs87 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 126322024d04 HDFS-17299. Adding rack failure tolerance when creating a 
new file (#6612)
126322024d04 is described below

commit 126322024d04d406fc219eadcd704aba7ad7d3b0
Author: ritegarg <58840065+riteg...@users.noreply.github.com>
AuthorDate: Thu Mar 14 15:31:53 2024 -0700

    HDFS-17299. Adding rack failure tolerance when creating a new file (#6612)
---
 .../java/org/apache/hadoop/hdfs/DataStreamer.java  |  69 +++++++----
 .../apache/hadoop/hdfs/StripedDataStreamer.java    |  12 +-
 .../hadoop/hdfs/server/datanode/BlockReceiver.java |   5 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java    |  12 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |  21 +++-
 .../hadoop/hdfs/TestDistributedFileSystem.java     | 131 ++++++++++++++++++++-
 .../hdfs/server/datanode/SimulatedFSDataset.java   |   6 +
 .../datanode/extdataset/ExternalDatasetImpl.java   |   6 +
 8 files changed, 228 insertions(+), 34 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index ccc2dfe20688..120083ab671e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -85,6 +85,7 @@ import 
org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
 import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
 import org.apache.hadoop.thirdparty.com.google.common.cache.RemovalListener;
 import 
org.apache.hadoop.thirdparty.com.google.common.cache.RemovalNotification;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -608,17 +609,17 @@ class DataStreamer extends Daemon {
     this.accessToken = t;
   }
 
-  private void setPipeline(LocatedBlock lb) {
+  protected void setPipeline(LocatedBlock lb) {
     setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
   }
 
-  private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
-                           String[] storageIDs) {
+  protected void setPipeline(DatanodeInfo[] newNodes, StorageType[] 
newStorageTypes,
+                           String[] newStorageIDs) {
     synchronized (nodesLock) {
-      this.nodes = nodes;
+      this.nodes = newNodes;
     }
-    this.storageTypes = storageTypes;
-    this.storageIDs = storageIDs;
+    this.storageTypes = newStorageTypes;
+    this.storageIDs = newStorageIDs;
   }
 
   /**
@@ -713,7 +714,7 @@ class DataStreamer extends Daemon {
 
         if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
           LOG.debug("Allocating new block: {}", this);
-          setPipeline(nextBlockOutputStream());
+          setupPipelineForCreate();
           initDataStreaming();
         } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
           LOG.debug("Append to block {}", block);
@@ -1519,8 +1520,11 @@ class DataStreamer extends Daemon {
    * it can be written to.
    * This happens when a file is appended or data streaming fails
    * It keeps on trying until a pipeline is setup
+   *
+   * Returns boolean whether pipeline was setup successfully or not.
+   * This boolean is used upstream on whether to continue creating pipeline or 
throw exception
    */
-  private void setupPipelineForAppendOrRecovery() throws IOException {
+  private boolean setupPipelineForAppendOrRecovery() throws IOException {
     // Check number of datanodes. Note that if there is no healthy datanode,
     // this must be internal error because we mark external error in striped
     // outputstream only when all the streamers are in the DATA_STREAMING stage
@@ -1530,33 +1534,46 @@ class DataStreamer extends Daemon {
       LOG.warn(msg);
       lastException.set(new IOException(msg));
       streamerClosed = true;
-      return;
+      return false;
     }
-    setupPipelineInternal(nodes, storageTypes, storageIDs);
+    return setupPipelineInternal(nodes, storageTypes, storageIDs);
   }
 
-  protected void setupPipelineInternal(DatanodeInfo[] datanodes,
+  protected boolean setupPipelineInternal(DatanodeInfo[] datanodes,
       StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
       throws IOException {
     boolean success = false;
     long newGS = 0L;
+    boolean isCreateStage = BlockConstructionStage.PIPELINE_SETUP_CREATE == 
stage;
     while (!success && !streamerClosed && dfsClient.clientRunning) {
       if (!handleRestartingDatanode()) {
-        return;
+        return false;
       }
 
-      final boolean isRecovery = errorState.hasInternalError();
+      final boolean isRecovery = errorState.hasInternalError() && 
!isCreateStage;
+
+
       if (!handleBadDatanode()) {
-        return;
+        return false;
       }
 
       handleDatanodeReplacement();
 
+      // During create stage, min replication should still be satisfied.
+      if (isCreateStage && !(dfsClient.dtpReplaceDatanodeOnFailureReplication 
> 0 &&
+          nodes.length  >= dfsClient.dtpReplaceDatanodeOnFailureReplication)) {
+        return false;
+      }
+
       // get a new generation stamp and an access token
       final LocatedBlock lb = updateBlockForPipeline();
       newGS = lb.getBlock().getGenerationStamp();
       accessToken = lb.getBlockToken();
 
+      if (isCreateStage) {
+        block.setCurrentBlock(lb.getBlock());
+      }
+
       // set up the pipeline again with the remaining nodes
       success = createBlockOutputStream(nodes, storageTypes, storageIDs, newGS,
           isRecovery);
@@ -1569,6 +1586,7 @@ class DataStreamer extends Daemon {
     if (success) {
       updatePipeline(newGS);
     }
+    return success;
   }
 
   /**
@@ -1707,7 +1725,7 @@ class DataStreamer extends Daemon {
    * Must get block ID and the IDs of the destinations from the namenode.
    * Returns the list of target datanodes.
    */
-  protected LocatedBlock nextBlockOutputStream() throws IOException {
+  protected void setupPipelineForCreate() throws IOException {
     LocatedBlock lb;
     DatanodeInfo[] nodes;
     StorageType[] nextStorageTypes;
@@ -1718,7 +1736,7 @@ class DataStreamer extends Daemon {
     do {
       errorState.resetInternalError();
       lastException.clear();
-
+      streamerClosed = false;
       DatanodeInfo[] excluded = getExcludedNodes();
       lb = locateFollowingBlock(
           excluded.length > 0 ? excluded : null, oldBlock);
@@ -1729,26 +1747,33 @@ class DataStreamer extends Daemon {
       nodes = lb.getLocations();
       nextStorageTypes = lb.getStorageTypes();
       nextStorageIDs = lb.getStorageIDs();
+      setPipeline(lb);
+      try {
+        // Connect to first DataNode in the list.
+        success = createBlockOutputStream(nodes, nextStorageTypes, 
nextStorageIDs, 0L, false)
+            || setupPipelineForAppendOrRecovery();
 
-      // Connect to first DataNode in the list.
-      success = createBlockOutputStream(nodes, nextStorageTypes, 
nextStorageIDs,
-          0L, false);
-
+      } catch(IOException ie) {
+        LOG.warn("Exception in setupPipelineForCreate " + this, ie);
+        success = false;
+      }
       if (!success) {
         LOG.warn("Abandoning " + block);
         dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
             stat.getFileId(), src, dfsClient.clientName);
         block.setCurrentBlock(null);
-        final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
+        final DatanodeInfo badNode = errorState.getBadNodeIndex() == -1
+              ? Iterables.getLast(failed)
+              : nodes[errorState.getBadNodeIndex()];
         LOG.warn("Excluding datanode " + badNode);
         excludedNodes.put(badNode, badNode);
+        setPipeline(null, null, null);
       }
     } while (!success && --count >= 0);
 
     if (!success) {
       throw new IOException("Unable to create new block.");
     }
-    return lb;
   }
 
   // connects to the first datanode in the pipeline
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index e90e66ace498..a903f02634ac 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -90,7 +90,7 @@ public class StripedDataStreamer extends DataStreamer {
   }
 
   @Override
-  protected LocatedBlock nextBlockOutputStream() throws IOException {
+  protected void setupPipelineForCreate() throws IOException {
     boolean success;
     LocatedBlock lb = getFollowingBlock();
     block.setCurrentBlock(lb.getBlock());
@@ -101,7 +101,6 @@ public class StripedDataStreamer extends DataStreamer {
     DatanodeInfo[] nodes = lb.getLocations();
     StorageType[] storageTypes = lb.getStorageTypes();
     String[] storageIDs = lb.getStorageIDs();
-
     // Connect to the DataNode. If fail the internal error state will be set.
     success = createBlockOutputStream(nodes, storageTypes, storageIDs, 0L,
         false);
@@ -113,7 +112,7 @@ public class StripedDataStreamer extends DataStreamer {
       excludedNodes.put(badNode, badNode);
       throw new IOException("Unable to create new block." + this);
     }
-    return lb;
+    setPipeline(lb);
   }
 
   @VisibleForTesting
@@ -122,18 +121,18 @@ public class StripedDataStreamer extends DataStreamer {
   }
 
   @Override
-  protected void setupPipelineInternal(DatanodeInfo[] nodes,
+  protected boolean setupPipelineInternal(DatanodeInfo[] nodes,
       StorageType[] nodeStorageTypes, String[] nodeStorageIDs)
       throws IOException {
     boolean success = false;
     while (!success && !streamerClosed() && dfsClient.clientRunning) {
       if (!handleRestartingDatanode()) {
-        return;
+        return false;
       }
       if (!handleBadDatanode()) {
         // for striped streamer if it is datanode error then close the stream
         // and return. no need to replace datanode
-        return;
+        return false;
       }
 
       // get a new generation stamp and an access token
@@ -179,6 +178,7 @@ public class StripedDataStreamer extends DataStreamer {
         setStreamerAsClosed();
       }
     } // while
+    return success;
   }
 
   void setExternalError() {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index edc22e8bbcb5..76d6d6782613 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -217,7 +217,10 @@ class BlockReceiver implements Closeable {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
           replicaHandler = datanode.data.createRbw(storageType, storageId,
-              block, allowLazyPersist);
+              block, allowLazyPersist, newGs);
+          if (newGs != 0L) {
+            block.setGenerationStamp(newGs);
+          }
           datanode.notifyNamenodeReceivingBlock(
               block, replicaHandler.getReplica().getStorageUuid());
           break;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 4bfb0cb870cd..3b8bc48b1b0e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -333,6 +333,16 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
   ReplicaHandler createRbw(StorageType storageType, String storageId,
       ExtendedBlock b, boolean allowLazyPersist) throws IOException;
 
+  /**
+   * Creates a RBW replica and returns the meta info of the replica
+   *
+   * @param b block
+   * @return the meta info of the replica which is being written to
+   * @throws IOException if an error occurs
+   */
+  ReplicaHandler createRbw(StorageType storageType, String storageId,
+      ExtendedBlock b, boolean allowLazyPersist, long newGS) throws 
IOException;
+
   /**
    * Recovers a RBW replica and returns the meta info of the replica.
    * 
@@ -466,7 +476,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
   boolean isValidRbw(ExtendedBlock b);
 
   /**
-   * Invalidates the specified blocks
+   * Invalidates the specified blocks.
    * @param bpid Block pool Id
    * @param invalidBlks - the blocks to be invalidated
    * @throws IOException
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 054c2c347d43..cf9490e7acee 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1438,13 +1438,27 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
   public ReplicaHandler createRbw(
       StorageType storageType, String storageId, ExtendedBlock b,
       boolean allowLazyPersist) throws IOException {
+    return createRbw(storageType, storageId, b, allowLazyPersist, 0L);
+  }
+
+  @Override // FsDatasetSpi
+  public ReplicaHandler createRbw(
+      StorageType storageType, String storageId, ExtendedBlock b,
+      boolean allowLazyPersist, long newGS) throws IOException {
     try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getBlockId());
       if (replicaInfo != null) {
-        throw new ReplicaAlreadyExistsException("Block " + b +
-            " already exists in state " + replicaInfo.getState() +
-            " and thus cannot be created.");
+        // In case of retries with same blockPoolId + blockId as before
+        // with updated GS, cleanup the old replica to avoid
+        // any multiple copies with same blockPoolId + blockId
+        if (newGS != 0L) {
+          cleanupReplica(b.getBlockPoolId(), replicaInfo);
+        } else {
+          throw new ReplicaAlreadyExistsException("Block " + b +
+              " already exists in state " + replicaInfo.getState() +
+              " and thus cannot be created.");
+        }
       }
       // create a new block
       FsVolumeReference ref = null;
@@ -1501,6 +1515,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     }
   }
 
+
   @Override // FsDatasetSpi
   public ReplicaHandler recoverRbw(
       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index 3d5adaf8b3f9..9ae5ae9ad1f2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -101,6 +101,8 @@ import 
org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -2164,4 +2166,131 @@ public class TestDistributedFileSystem {
       assertFalse(result.isSupported());
     }
   }
-}
+
+  @Test
+  public void testSingleRackFailureDuringPipelineSetupMinReplicationPossible() 
throws Exception {
+    Configuration conf = getTestConfiguration();
+    conf.setClass(
+        DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyRackFaultTolerant.class,
+        BlockPlacementPolicy.class);
+    conf.setBoolean(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+        false);
+    conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+        MIN_REPLICATION, 2);
+    // 3 racks & 3 nodes. 1 per rack
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3)
+        .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
+      cluster.waitClusterUp();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // kill one DN, so only 2 racks stays with active DN
+      cluster.stopDataNode(0);
+      // create a file with replication 3, for rack fault tolerant BPP,
+      // it should allocate nodes in all 3 racks.
+      DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 
1024L);
+    }
+  }
+
+  @Test
+  public void 
testSingleRackFailureDuringPipelineSetupMinReplicationImpossible()
+      throws Exception {
+    Configuration conf = getTestConfiguration();
+    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyRackFaultTolerant.class, 
BlockPlacementPolicy.class);
+    
conf.setBoolean(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
 false);
+    
conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION,
 3);
+    // 3 racks & 3 nodes. 1 per rack
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3)
+        .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
+      cluster.waitClusterUp();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // kill one DN, so only 2 racks stays with active DN
+      cluster.stopDataNode(0);
+      LambdaTestUtils.intercept(IOException.class,
+          () ->
+              DFSTestUtil.createFile(fs, new Path("/testFile"),
+                  1024L, (short) 3, 1024L));
+    }
+  }
+
+  @Test
+  public void 
testMultipleRackFailureDuringPipelineSetupMinReplicationPossible() throws 
Exception {
+    Configuration conf = getTestConfiguration();
+    conf.setClass(
+        DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyRackFaultTolerant.class,
+        BlockPlacementPolicy.class);
+    conf.setBoolean(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+        false);
+    conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+        MIN_REPLICATION, 1);
+    // 3 racks & 3 nodes. 1 per rack
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3)
+        .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
+      cluster.waitClusterUp();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // kill 2 DN, so only 1 racks stays with active DN
+      cluster.stopDataNode(0);
+      cluster.stopDataNode(1);
+      // create a file with replication 3, for rack fault tolerant BPP,
+      // it should allocate nodes in all 3 racks.
+      DFSTestUtil.createFile(fs, new Path("/testFile"), 1024L, (short) 3, 
1024L);
+    }
+  }
+
+  @Test
+  public void 
testMultipleRackFailureDuringPipelineSetupMinReplicationImpossible()
+      throws Exception {
+    Configuration conf = getTestConfiguration();
+    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyRackFaultTolerant.class,
+        BlockPlacementPolicy.class);
+    conf.setBoolean(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+        false);
+    conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+        MIN_REPLICATION, 2);
+    // 3 racks & 3 nodes. 1 per rack
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3)
+        .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
+      cluster.waitClusterUp();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // kill 2 DN, so only 1 rack stays with active DN
+      cluster.stopDataNode(0);
+      cluster.stopDataNode(1);
+      LambdaTestUtils.intercept(IOException.class,
+          () ->
+              DFSTestUtil.createFile(fs, new Path("/testFile"),
+                  1024L, (short) 3, 1024L));
+    }
+  }
+
+  @Test
+  public void testAllRackFailureDuringPipelineSetup() throws Exception {
+    Configuration conf = getTestConfiguration();
+    conf.setClass(
+        DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyRackFaultTolerant.class,
+        BlockPlacementPolicy.class);
+    conf.setBoolean(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+        false);
+    // 3 racks & 3 nodes. 1 per rack
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(3)
+        .racks(new String[] {"/rack1", "/rack2", "/rack3"}).build()) {
+      cluster.waitClusterUp();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // shutdown all DNs
+      cluster.shutdownDataNodes();
+      // create a file with replication 3, for rack fault tolerant BPP,
+      // it should allocate nodes in all 3 rack but fail because no DNs are 
present.
+      LambdaTestUtils.intercept(IOException.class,
+          () ->
+          DFSTestUtil.createFile(fs, new Path("/testFile"),
+              1024L, (short) 3, 1024L));
+    }
+  }
+
+}
\ No newline at end of file
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index ffb6026a97b7..acfadca50720 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1166,6 +1166,12 @@ public class SimulatedFSDataset implements 
FsDatasetSpi<FsVolumeSpi> {
     return createTemporary(storageType, storageId, b, false);
   }
 
+  @Override
+  public ReplicaHandler createRbw(StorageType storageType, String storageId,
+      ExtendedBlock b, boolean allowLazyPersist, long newGS) throws 
IOException {
+    return createRbw(storageType, storageId, b, allowLazyPersist);
+  }
+
   @Override // FsDatasetSpi
   public synchronized ReplicaHandler createTemporary(StorageType storageType,
       String storageId, ExtendedBlock b, boolean isTransfer)
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index cf91867b4f3b..3d3c3015aea2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -151,6 +151,12 @@ public class ExternalDatasetImpl implements 
FsDatasetSpi<ExternalVolumeImpl> {
     return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
   }
 
+  @Override
+  public ReplicaHandler createRbw(StorageType storageType, String storageId,
+      ExtendedBlock b, boolean allowLazyPersist, long newGS) throws 
IOException {
+    return createRbw(storageType, storageId, b, allowLazyPersist);
+  }
+
   @Override
   public ReplicaHandler recoverRbw(ExtendedBlock b, long newGS,
       long minBytesRcvd, long maxBytesRcvd) throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to