HDFS-9289. Make DataStreamer#block thread safe and verify genStamp in 
commitBlock. Contributed by Chang Li.

Change-Id: If5ce1b2d212bb0726bce52ad12a3de401bcec02d


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dac0463a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dac0463a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dac0463a

Branch: refs/heads/HDFS-7240
Commit: dac0463a4e20dfb3a802355919fc22b8e017a4e1
Parents: 7e28296
Author: Zhe Zhang <z...@apache.org>
Authored: Tue Nov 3 13:34:05 2015 -0800
Committer: Zhe Zhang <z...@apache.org>
Committed: Tue Nov 3 13:34:24 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DataStreamer.java    |   2 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/blockmanagement/BlockInfo.java  |   2 +-
 .../server/blockmanagement/BlockManager.java    |   4 +
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  27 +++--
 .../TestCommitBlockWithInvalidGenStamp.java     | 100 +++++++++++++++++++
 .../namenode/TestQuotaWithStripedBlocks.java    |   4 +-
 7 files changed, 128 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dac0463a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 03c2c52..7cb89c5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -351,7 +351,7 @@ class DataStreamer extends Daemon {
   }
 
   private volatile boolean streamerClosed = false;
-  protected ExtendedBlock block; // its length is number of bytes acked
+  protected volatile ExtendedBlock block; // its length is number of bytes 
acked
   protected Token<BlockTokenIdentifier> accessToken;
   private DataOutputStream blockStream;
   private DataInputStream blockReplyStream;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dac0463a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index fbf211f..13c4094 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -2225,6 +2225,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-9362. TestAuditLogger#testAuditLoggerWithCallContext assumes Unix line
     endings, fails on Windows. (cnauroth)
 
+    HDFS-9289. Make DataStreamer#block thread safe and verify genStamp in
+    commitBlock. (Chang Li via zhz)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dac0463a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index e15b5ee..e9fa123 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -411,7 +411,7 @@ public abstract class BlockInfo extends Block
     }
     Preconditions.checkState(!isComplete());
     uc.commit();
-    this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
+    this.setNumBytes(block.getNumBytes());
     // Sort out invalid replicas.
     setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dac0463a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index dbe0726..3c6c4d3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -653,6 +653,10 @@ public class BlockManager implements BlockStatsMXBean {
     assert block.getNumBytes() <= commitBlock.getNumBytes() :
         "commitBlock length is less than the stored one "
             + commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
+    if(block.getGenerationStamp() != commitBlock.getGenerationStamp()) {
+      throw new IOException("Commit block with mismatching GS. NN has " +
+          block + ", client submits " + commitBlock);
+    }
     block.commitBlock(commitBlock);
     return true;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dac0463a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 86b89a2..80c89e9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1917,9 +1917,9 @@ public class DFSTestUtil {
 
       ExtendedBlock previous = null;
       for (int i = 0; i < numBlocks; i++) {
-        Block newBlock = addStripedBlockToFile(cluster.getDataNodes(), dfs, ns,
+        Block newBlock = addBlockToFile(true, cluster.getDataNodes(), dfs, ns,
             file.toString(), fileNode, dfs.getClient().getClientName(),
-            previous, numStripesPerBlk);
+            previous, numStripesPerBlk, 0);
         previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
       }
 
@@ -1931,18 +1931,22 @@ public class DFSTestUtil {
   }
 
   /**
-   * Adds a striped block group to a file. This method only manipulates 
NameNode
+   * Adds a block or a striped block group to a file.
+   * This method only manipulates NameNode
    * states of the file and the block without injecting data to DataNode.
    * It does mimic block reports.
    * You should disable periodical heartbeat before use this.
+   * @param isStripedBlock a boolean tell if the block added a striped block
    * @param dataNodes List DataNodes to host the striped block group
    * @param previous Previous block in the file
    * @param numStripes Number of stripes in each block group
-   * @return The added block group
+   * @param len block size for a non striped block added
+   * @return The added block or block group
    */
-  public static Block addStripedBlockToFile(List<DataNode> dataNodes,
-      DistributedFileSystem fs, FSNamesystem ns, String file, INodeFile 
fileNode,
-      String clientName, ExtendedBlock previous, int numStripes)
+  public static Block addBlockToFile(boolean isStripedBlock,
+      List<DataNode> dataNodes, DistributedFileSystem fs, FSNamesystem ns,
+      String file, INodeFile fileNode,
+      String clientName, ExtendedBlock previous, int numStripes, int len)
       throws Exception {
     fs.getClient().namenode.addBlock(file, clientName, previous, null,
         fileNode.getId(), null);
@@ -1965,10 +1969,12 @@ public class DFSTestUtil {
     }
 
     // 2. RECEIVED_BLOCK IBR
+    long blockSize = isStripedBlock ?
+        numStripes * BLOCK_STRIPED_CELL_SIZE : len;
     for (int i = 0; i < groupSize; i++) {
       DataNode dn = dataNodes.get(i);
       final Block block = new Block(lastBlock.getBlockId() + i,
-          numStripes * BLOCK_STRIPED_CELL_SIZE, 
lastBlock.getGenerationStamp());
+          blockSize, lastBlock.getGenerationStamp());
       DatanodeStorage storage = new 
DatanodeStorage(UUID.randomUUID().toString());
       StorageReceivedDeletedBlocks[] reports = DFSTestUtil
           .makeReportForReceivedBlock(block,
@@ -1977,8 +1983,9 @@ public class DFSTestUtil {
         ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
       }
     }
-
-    lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * 
NUM_DATA_BLOCKS);
+    long bytes = isStripedBlock ?
+        numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS : len;
+    lastBlock.setNumBytes(bytes);
     return lastBlock;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dac0463a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
new file mode 100644
index 0000000..d2b2b5a
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+
+public class TestCommitBlockWithInvalidGenStamp {
+  private static final int BLOCK_SIZE = 1024;
+  private MiniDFSCluster cluster;
+  private FSDirectory dir;
+  private DistributedFileSystem dfs;
+
+  @Before
+  public void setUp() throws IOException {
+    final Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    cluster = new MiniDFSCluster.Builder(conf).build();
+    cluster.waitActive();
+
+    dir = cluster.getNamesystem().getFSDirectory();
+    dfs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testCommitWithInvalidGenStamp() throws Exception {
+    final Path file = new Path("/file");
+    FSDataOutputStream out = null;
+
+    try {
+      out = dfs.create(file, (short) 1);
+      INodeFile fileNode = dir.getINode4Write(file.toString()).asFile();
+      ExtendedBlock previous = null;
+
+      Block newBlock = DFSTestUtil.addBlockToFile(false, 
cluster.getDataNodes(),
+          dfs, cluster.getNamesystem(), file.toString(), fileNode,
+          dfs.getClient().getClientName(), previous, 0, 100);
+      Block newBlockClone = new Block(newBlock);
+      previous = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(),
+          newBlockClone);
+
+      previous.setGenerationStamp(123);
+      try{
+        dfs.getClient().getNamenode().complete(file.toString(),
+            dfs.getClient().getClientName(), previous, fileNode.getId());
+        Assert.fail("should throw exception because invalid genStamp");
+      } catch (IOException e) {
+        Assert.assertTrue(e.toString().contains(
+            "Commit block with mismatching GS. NN has " +
+            newBlock + ", client submits " + newBlockClone));
+      }
+      previous = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(),
+          newBlock);
+      boolean complete =  
dfs.getClient().getNamenode().complete(file.toString(),
+      dfs.getClient().getClientName(), previous, fileNode.getId());
+      Assert.assertTrue("should complete successfully", complete);
+    } finally {
+      IOUtils.cleanup(null, out);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dac0463a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestQuotaWithStripedBlocks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestQuotaWithStripedBlocks.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestQuotaWithStripedBlocks.java
index 26f9b8e..08a1744 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestQuotaWithStripedBlocks.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestQuotaWithStripedBlocks.java
@@ -90,9 +90,9 @@ public class TestQuotaWithStripedBlocks {
       INodeFile fileNode = dir.getINode4Write(file.toString()).asFile();
       ExtendedBlock previous = null;
       // Create striped blocks which have a cell in each block.
-      Block newBlock = 
DFSTestUtil.addStripedBlockToFile(cluster.getDataNodes(),
+      Block newBlock = DFSTestUtil.addBlockToFile(true, cluster.getDataNodes(),
           dfs, cluster.getNamesystem(), file.toString(), fileNode,
-          dfs.getClient().getClientName(), previous, 1);
+          dfs.getClient().getClientName(), previous, 1, 0);
       previous = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(),
           newBlock);
 

Reply via email to