HDFS-7912. Erasure Coding: track BlockInfo instead of Block in UnderReplicatedBlocks and PendingReplicationBlocks. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4c0e02ae Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4c0e02ae Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4c0e02ae Branch: refs/heads/HDFS-7285 Commit: 4c0e02ae72374e86782ec4851b80cf444103ef6d Parents: 227cffd Author: Jing Zhao <ji...@apache.org> Authored: Tue Mar 17 10:18:50 2015 -0700 Committer: Zhe Zhang <z...@apache.org> Committed: Tue Mar 24 11:16:34 2015 -0700 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 47 ++++++++--------- .../PendingReplicationBlocks.java | 51 +++++++++---------- .../blockmanagement/UnderReplicatedBlocks.java | 49 +++++++++--------- .../hdfs/server/namenode/FSDirAttrOp.java | 10 ++-- .../hdfs/server/namenode/FSNamesystem.java | 21 ++++---- .../hadoop/hdfs/server/namenode/INode.java | 12 ++--- .../hadoop/hdfs/server/namenode/INodeFile.java | 4 +- .../hdfs/server/namenode/NamenodeFsck.java | 10 ++-- .../hadoop/hdfs/server/namenode/SafeMode.java | 3 +- .../blockmanagement/BlockManagerTestUtil.java | 5 +- .../blockmanagement/TestBlockManager.java | 8 +-- .../server/blockmanagement/TestNodeCount.java | 3 +- .../TestOverReplicatedBlocks.java | 5 +- .../blockmanagement/TestPendingReplication.java | 19 ++++--- .../TestRBWBlockInvalidation.java | 4 +- .../blockmanagement/TestReplicationPolicy.java | 53 +++++++++++--------- .../TestUnderReplicatedBlockQueues.java | 16 +++--- .../datanode/TestReadOnlySharedStorage.java | 9 ++-- .../namenode/TestProcessCorruptBlocks.java | 5 +- 19 files changed, 180 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index a6bccc1..f7a00f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1336,7 +1336,7 @@ public class BlockManager { * @return number of blocks scheduled for replication during this iteration. */ int computeReplicationWork(int blocksToProcess) { - List<List<Block>> blocksToReplicate = null; + List<List<BlockInfo>> blocksToReplicate = null; namesystem.writeLock(); try { // Choose the blocks to be replicated @@ -1354,7 +1354,7 @@ public class BlockManager { * @return the number of blocks scheduled for replication */ @VisibleForTesting - int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) { + int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) { int requiredReplication, numEffectiveReplicas; List<DatanodeDescriptor> containingNodes; DatanodeDescriptor srcNode; @@ -1368,7 +1368,7 @@ public class BlockManager { try { synchronized (neededReplications) { for (int priority = 0; priority < blocksToReplicate.size(); priority++) { - for (Block block : blocksToReplicate.get(priority)) { + for (BlockInfo block : blocksToReplicate.get(priority)) { // block should belong to a file bc = blocksMap.getBlockCollection(block); // abandoned block or block reopened for append @@ -1452,7 +1452,7 @@ public class BlockManager { } synchronized (neededReplications) { - Block block = rw.block; + BlockInfo block = rw.block; int priority = rw.priority; // Recheck since global lock was released // block should belong to a file @@ -1710,7 +1710,7 @@ public class BlockManager { * and put them back into the neededReplication queue */ private void processPendingReplications() { - Block[] timedOutItems = pendingReplications.getTimedOutBlocks(); + BlockInfo[] timedOutItems = pendingReplications.getTimedOutBlocks(); if (timedOutItems != null) { namesystem.writeLock(); try { @@ -2883,13 +2883,13 @@ public class BlockManager { /** Set replication for the blocks. */ public void setReplication(final short oldRepl, final short newRepl, - final String src, final Block... blocks) { + final String src, final BlockInfoContiguous... blocks) { if (newRepl == oldRepl) { return; } // update needReplication priority queues - for(Block b : blocks) { + for(BlockInfoContiguous b : blocks) { updateNeededReplications(b, 0, newRepl-oldRepl); } @@ -2897,7 +2897,7 @@ public class BlockManager { // old replication > the new one; need to remove copies LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl + " for " + src); - for(Block b : blocks) { + for(BlockInfoContiguous b : blocks) { processOverReplicatedBlock(b, newRepl, null, null); } } else { // replication factor is increased @@ -3069,7 +3069,8 @@ public class BlockManager { blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node); assert (namesystem.hasWriteLock()); { - if (!blocksMap.removeNode(block, node)) { + BlockInfo storedBlock = getStoredBlock(block); + if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) { blockLog.debug("BLOCK* removeStoredBlock: {} has already been" + " removed from node {}", block, node); return; @@ -3083,8 +3084,8 @@ public class BlockManager { // BlockCollection bc = blocksMap.getBlockCollection(block); if (bc != null) { - namesystem.decrementSafeBlockCount(block); - updateNeededReplications(block, -1, 0); + namesystem.decrementSafeBlockCount(storedBlock); + updateNeededReplications(storedBlock, -1, 0); } // @@ -3158,7 +3159,7 @@ public class BlockManager { // // Modify the blocks->datanode map and node's map. // - pendingReplications.decrement(block, node); + pendingReplications.decrement(getStoredBlock(block), node); processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, delHintNode); } @@ -3273,7 +3274,7 @@ public class BlockManager { * For a striped block, this includes nodes storing blocks belonging to the * striped block group. */ - public NumberReplicas countNodes(Block b) { + public NumberReplicas countNodes(BlockInfo b) { int decommissioned = 0; int live = 0; int corrupt = 0; @@ -3303,11 +3304,11 @@ public class BlockManager { } /** - * Simpler, faster form of {@link #countNodes(Block)} that only returns the number + * Simpler, faster form of {@link #countNodes} that only returns the number * of live nodes. If in startup safemode (or its 30-sec extension period), * then it gains speed by ignoring issues of excess replicas or nodes * that are decommissioned or in process of becoming decommissioned. - * If not in startup, then it calls {@link #countNodes(Block)} instead. + * If not in startup, then it calls {@link #countNodes} instead. * * @param b - the block being tested * @return count of live nodes for this block @@ -3337,10 +3338,10 @@ public class BlockManager { if (!namesystem.isPopulatingReplQueues()) { return; } - final Iterator<? extends Block> it = srcNode.getBlockIterator(); + final Iterator<BlockInfo> it = srcNode.getBlockIterator(); int numOverReplicated = 0; while(it.hasNext()) { - final Block block = it.next(); + final BlockInfo block = it.next(); BlockCollection bc = blocksMap.getBlockCollection(block); short expectedReplication = bc.getBlockReplication(); NumberReplicas num = countNodes(block); @@ -3399,7 +3400,7 @@ public class BlockManager { return blocksMap.size(); } - public void removeBlock(Block block) { + public void removeBlock(BlockInfo block) { assert namesystem.hasWriteLock(); // No need to ACK blocks that are being removed entirely // from the namespace, since the removal of the associated @@ -3429,7 +3430,7 @@ public class BlockManager { } /** updates a block in under replication queue */ - private void updateNeededReplications(final Block block, + private void updateNeededReplications(final BlockInfo block, final int curReplicasDelta, int expectedReplicasDelta) { namesystem.writeLock(); try { @@ -3461,7 +3462,7 @@ public class BlockManager { */ public void checkReplication(BlockCollection bc) { final short expected = bc.getBlockReplication(); - for (Block block : bc.getBlocks()) { + for (BlockInfo block : bc.getBlocks()) { final NumberReplicas n = countNodes(block); if (isNeededReplication(block, expected, n.liveReplicas())) { neededReplications.add(block, n.liveReplicas(), @@ -3598,7 +3599,7 @@ public class BlockManager { /** * Return an iterator over the set of blocks for which there are no replicas. */ - public Iterator<Block> getCorruptReplicaBlockIterator() { + public Iterator<BlockInfo> getCorruptReplicaBlockIterator() { return neededReplications.iterator( UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS); } @@ -3712,7 +3713,7 @@ public class BlockManager { private static class ReplicationWork { - private final Block block; + private final BlockInfo block; private final BlockCollection bc; private final DatanodeDescriptor srcNode; @@ -3723,7 +3724,7 @@ public class BlockManager { private DatanodeStorageInfo targets[]; private final int priority; - public ReplicationWork(Block block, + public ReplicationWork(BlockInfo block, BlockCollection bc, DatanodeDescriptor srcNode, List<DatanodeDescriptor> containingNodes, http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java index 796b878..04232cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java @@ -23,6 +23,7 @@ import java.io.PrintWriter; import java.sql.Time; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -46,8 +47,8 @@ import org.slf4j.Logger; class PendingReplicationBlocks { private static final Logger LOG = BlockManager.LOG; - private final Map<Block, PendingBlockInfo> pendingReplications; - private final ArrayList<Block> timedOutItems; + private final Map<BlockInfo, PendingBlockInfo> pendingReplications; + private final ArrayList<BlockInfo> timedOutItems; Daemon timerThread = null; private volatile boolean fsRunning = true; @@ -62,8 +63,8 @@ class PendingReplicationBlocks { if ( timeoutPeriod > 0 ) { this.timeout = timeoutPeriod; } - pendingReplications = new HashMap<Block, PendingBlockInfo>(); - timedOutItems = new ArrayList<Block>(); + pendingReplications = new HashMap<>(); + timedOutItems = new ArrayList<>(); } void start() { @@ -76,7 +77,7 @@ class PendingReplicationBlocks { * @param block The corresponding block * @param targets The DataNodes where replicas of the block should be placed */ - void increment(Block block, DatanodeDescriptor[] targets) { + void increment(BlockInfo block, DatanodeDescriptor[] targets) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found == null) { @@ -93,9 +94,9 @@ class PendingReplicationBlocks { * Decrement the number of pending replication requests * for this block. * - * @param The DataNode that finishes the replication + * @param dn The DataNode that finishes the replication */ - void decrement(Block block, DatanodeDescriptor dn) { + void decrement(BlockInfo block, DatanodeDescriptor dn) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found != null) { @@ -115,7 +116,7 @@ class PendingReplicationBlocks { * @param block The given block whose pending replication requests need to be * removed */ - void remove(Block block) { + void remove(BlockInfo block) { synchronized (pendingReplications) { pendingReplications.remove(block); } @@ -138,7 +139,7 @@ class PendingReplicationBlocks { /** * How many copies of this block is pending replication? */ - int getNumReplicas(Block block) { + int getNumReplicas(BlockInfo block) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found != null) { @@ -153,13 +154,13 @@ class PendingReplicationBlocks { * replication requests. Returns null if no blocks have * timed out. */ - Block[] getTimedOutBlocks() { + BlockInfo[] getTimedOutBlocks() { synchronized (timedOutItems) { if (timedOutItems.size() <= 0) { return null; } - Block[] blockList = timedOutItems.toArray( - new Block[timedOutItems.size()]); + BlockInfo[] blockList = timedOutItems.toArray( + new BlockInfo[timedOutItems.size()]); timedOutItems.clear(); return blockList; } @@ -179,7 +180,7 @@ class PendingReplicationBlocks { PendingBlockInfo(DatanodeDescriptor[] targets) { this.timeStamp = monotonicNow(); this.targets = targets == null ? new ArrayList<DatanodeDescriptor>() - : new ArrayList<DatanodeDescriptor>(Arrays.asList(targets)); + : new ArrayList<>(Arrays.asList(targets)); } long getTimeStamp() { @@ -192,9 +193,7 @@ class PendingReplicationBlocks { void incrementReplicas(DatanodeDescriptor... newTargets) { if (newTargets != null) { - for (DatanodeDescriptor dn : newTargets) { - targets.add(dn); - } + Collections.addAll(targets, newTargets); } } @@ -232,17 +231,17 @@ class PendingReplicationBlocks { */ void pendingReplicationCheck() { synchronized (pendingReplications) { - Iterator<Map.Entry<Block, PendingBlockInfo>> iter = + Iterator<Map.Entry<BlockInfo, PendingBlockInfo>> iter = pendingReplications.entrySet().iterator(); long now = monotonicNow(); if(LOG.isDebugEnabled()) { LOG.debug("PendingReplicationMonitor checking Q"); } while (iter.hasNext()) { - Map.Entry<Block, PendingBlockInfo> entry = iter.next(); + Map.Entry<BlockInfo, PendingBlockInfo> entry = iter.next(); PendingBlockInfo pendingBlock = entry.getValue(); if (now > pendingBlock.getTimeStamp() + timeout) { - Block block = entry.getKey(); + BlockInfo block = entry.getKey(); synchronized (timedOutItems) { timedOutItems.add(block); } @@ -275,16 +274,14 @@ class PendingReplicationBlocks { synchronized (pendingReplications) { out.println("Metasave: Blocks being replicated: " + pendingReplications.size()); - Iterator<Map.Entry<Block, PendingBlockInfo>> iter = - pendingReplications.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry<Block, PendingBlockInfo> entry = iter.next(); + for (Map.Entry<BlockInfo, PendingBlockInfo> entry : + pendingReplications.entrySet()) { PendingBlockInfo pendingBlock = entry.getValue(); Block block = entry.getKey(); - out.println(block + - " StartTime: " + new Time(pendingBlock.timeStamp) + - " NumReplicaInProgress: " + - pendingBlock.getNumReplicas()); + out.println(block + + " StartTime: " + new Time(pendingBlock.timeStamp) + + " NumReplicaInProgress: " + + pendingBlock.getNumReplicas()); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java index 1daa0ee..f9bce26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -35,7 +34,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; * * <p/> * The policy for choosing which priority to give added blocks - * is implemented in {@link #getPriority(Block, int, int, int)}. + * is implemented in {@link #getPriority(int, int, int)}. * </p> * <p>The queue order is as follows:</p> * <ol> @@ -62,7 +61,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; * blocks that are not corrupt higher priority.</li> * </ol> */ -class UnderReplicatedBlocks implements Iterable<Block> { +class UnderReplicatedBlocks implements Iterable<BlockInfo> { /** The total number of queues : {@value} */ static final int LEVEL = 5; /** The queue with the highest priority: {@value} */ @@ -78,8 +77,8 @@ class UnderReplicatedBlocks implements Iterable<Block> { /** The queue for corrupt blocks: {@value} */ static final int QUEUE_WITH_CORRUPT_BLOCKS = 4; /** the queues themselves */ - private final List<LightWeightLinkedSet<Block>> priorityQueues - = new ArrayList<LightWeightLinkedSet<Block>>(LEVEL); + private final List<LightWeightLinkedSet<BlockInfo>> priorityQueues + = new ArrayList<>(LEVEL); /** The number of corrupt blocks with replication factor 1 */ private int corruptReplOneBlocks = 0; @@ -87,7 +86,7 @@ class UnderReplicatedBlocks implements Iterable<Block> { /** Create an object. */ UnderReplicatedBlocks() { for (int i = 0; i < LEVEL; i++) { - priorityQueues.add(new LightWeightLinkedSet<Block>()); + priorityQueues.add(new LightWeightLinkedSet<BlockInfo>()); } } @@ -131,8 +130,8 @@ class UnderReplicatedBlocks implements Iterable<Block> { } /** Check if a block is in the neededReplication queue */ - synchronized boolean contains(Block block) { - for(LightWeightLinkedSet<Block> set : priorityQueues) { + synchronized boolean contains(BlockInfo block) { + for(LightWeightLinkedSet<BlockInfo> set : priorityQueues) { if (set.contains(block)) { return true; } @@ -141,13 +140,11 @@ class UnderReplicatedBlocks implements Iterable<Block> { } /** Return the priority of a block - * @param block a under replicated block * @param curReplicas current number of replicas of the block * @param expectedReplicas expected number of replicas of the block * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1) */ - private int getPriority(Block block, - int curReplicas, + private int getPriority(int curReplicas, int decommissionedReplicas, int expectedReplicas) { assert curReplicas >= 0 : "Negative replicas!"; @@ -183,12 +180,12 @@ class UnderReplicatedBlocks implements Iterable<Block> { * @param expectedReplicas expected number of replicas of the block * @return true if the block was added to a queue. */ - synchronized boolean add(Block block, + synchronized boolean add(BlockInfo block, int curReplicas, int decomissionedReplicas, int expectedReplicas) { assert curReplicas >= 0 : "Negative replicas!"; - int priLevel = getPriority(block, curReplicas, decomissionedReplicas, + int priLevel = getPriority(curReplicas, decomissionedReplicas, expectedReplicas); if(priorityQueues.get(priLevel).add(block)) { if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && @@ -207,11 +204,11 @@ class UnderReplicatedBlocks implements Iterable<Block> { } /** remove a block from a under replication queue */ - synchronized boolean remove(Block block, + synchronized boolean remove(BlockInfo block, int oldReplicas, int decommissionedReplicas, int oldExpectedReplicas) { - int priLevel = getPriority(block, oldReplicas, + int priLevel = getPriority(oldReplicas, decommissionedReplicas, oldExpectedReplicas); boolean removedBlock = remove(block, priLevel); @@ -241,7 +238,7 @@ class UnderReplicatedBlocks implements Iterable<Block> { * @param priLevel expected privilege level * @return true if the block was found and removed from one of the priority queues */ - boolean remove(Block block, int priLevel) { + boolean remove(BlockInfo block, int priLevel) { if(priLevel >= 0 && priLevel < LEVEL && priorityQueues.get(priLevel).remove(block)) { NameNode.blockStateChangeLog.debug( @@ -279,14 +276,14 @@ class UnderReplicatedBlocks implements Iterable<Block> { * @param curReplicasDelta the change in the replicate count from before * @param expectedReplicasDelta the change in the expected replica count from before */ - synchronized void update(Block block, int curReplicas, + synchronized void update(BlockInfo block, int curReplicas, int decommissionedReplicas, int curExpectedReplicas, int curReplicasDelta, int expectedReplicasDelta) { int oldReplicas = curReplicas-curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; - int curPri = getPriority(block, curReplicas, decommissionedReplicas, curExpectedReplicas); - int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas); + int curPri = getPriority(curReplicas, decommissionedReplicas, curExpectedReplicas); + int oldPri = getPriority(oldReplicas, decommissionedReplicas, oldExpectedReplicas); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + block + @@ -336,12 +333,12 @@ class UnderReplicatedBlocks implements Iterable<Block> { * @return Return a list of block lists to be replicated. The block list index * represents its replication priority. */ - public synchronized List<List<Block>> chooseUnderReplicatedBlocks( + public synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks( int blocksToProcess) { // initialize data structure for the return value - List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(LEVEL); + List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL); for (int i = 0; i < LEVEL; i++) { - blocksToReplicate.add(new ArrayList<Block>()); + blocksToReplicate.add(new ArrayList<BlockInfo>()); } if (size() == 0) { // There are no blocks to collect. @@ -364,7 +361,7 @@ class UnderReplicatedBlocks implements Iterable<Block> { // Loop through all remaining blocks in the list. while (blockCount < blocksToProcess && neededReplicationsIterator.hasNext()) { - Block block = neededReplicationsIterator.next(); + BlockInfo block = neededReplicationsIterator.next(); blocksToReplicate.get(priority).add(block); blockCount++; } @@ -396,10 +393,10 @@ class UnderReplicatedBlocks implements Iterable<Block> { /** * An iterator over blocks. */ - class BlockIterator implements Iterator<Block> { + class BlockIterator implements Iterator<BlockInfo> { private int level; private boolean isIteratorForLevel = false; - private final List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>(); + private final List<Iterator<BlockInfo>> iterators = new ArrayList<>(); /** * Construct an iterator over all queues. @@ -431,7 +428,7 @@ class UnderReplicatedBlocks implements Iterable<Block> { } @Override - public Block next() { + public BlockInfo next() { if (isIteratorForLevel) { return iterators.get(0).next(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java index a3881b8..eefbcce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.util.EnumCounters; @@ -148,8 +150,8 @@ public class FSDirAttrOp { } final short[] blockRepls = new short[2]; // 0: old, 1: new - final Block[] blocks = unprotectedSetReplication(fsd, src, replication, - blockRepls); + final BlockInfoContiguous[] blocks = unprotectedSetReplication(fsd, src, + replication, blockRepls); isFile = blocks != null; if (isFile) { fsd.getEditLog().logSetReplication(src, replication); @@ -375,7 +377,7 @@ public class FSDirAttrOp { } } - static Block[] unprotectedSetReplication( + static BlockInfoContiguous[] unprotectedSetReplication( FSDirectory fsd, String src, short replication, short[] blockRepls) throws QuotaExceededException, UnresolvedLinkException, SnapshotAccessControlException { @@ -410,7 +412,7 @@ public class FSDirAttrOp { blockRepls[0] = oldBR; blockRepls[1] = newBR; } - return file.getBlocks(); + return file.getContiguousBlocks(); } static void unprotectedSetStoragePolicy( http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index bea3bc8..80fdd68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3730,8 +3730,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * of blocks that need to be removed from blocksMap */ void removeBlocks(BlocksMapUpdateInfo blocks) { - List<Block> toDeleteList = blocks.getToDeleteList(); - Iterator<Block> iter = toDeleteList.iterator(); + List<BlockInfo> toDeleteList = blocks.getToDeleteList(); + Iterator<BlockInfo> iter = toDeleteList.iterator(); while (iter.hasNext()) { writeLock(); try { @@ -3786,12 +3786,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, boolean trackBlockCounts = isSafeModeTrackingBlocks(); int numRemovedComplete = 0, numRemovedSafe = 0; - for (Block b : blocks.getToDeleteList()) { + for (BlockInfo b : blocks.getToDeleteList()) { if (trackBlockCounts) { - BlockInfo bi = getStoredBlock(b); - if (bi.isComplete()) { + if (b.isComplete()) { numRemovedComplete++; - if (blockManager.checkMinStorage(bi, bi.numNodes())) { + if (blockManager.checkMinStorage(b, b.numNodes())) { numRemovedSafe++; } } @@ -4720,7 +4719,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, writeLock(); try { - final Iterator<Block> it = blockManager.getCorruptReplicaBlockIterator(); + final Iterator<BlockInfo> it = + blockManager.getCorruptReplicaBlockIterator(); while (it.hasNext()) { Block b = it.next(); @@ -5668,7 +5668,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } @Override - public void decrementSafeBlockCount(Block b) { + public void decrementSafeBlockCount(BlockInfo b) { // safeMode is volatile, and may be set to null at any time SafeModeInfo safeMode = this.safeMode; if (safeMode == null) // mostly true @@ -6533,7 +6533,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } // print a limited # of corrupt files per call - final Iterator<Block> blkIterator = blockManager.getCorruptReplicaBlockIterator(); + final Iterator<BlockInfo> blkIterator = + blockManager.getCorruptReplicaBlockIterator(); int skip = getIntCookie(cookieTab[0]); for (int i = 0; i < skip && blkIterator.hasNext(); i++) { @@ -6541,7 +6542,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } while (blkIterator.hasNext()) { - Block blk = blkIterator.next(); + BlockInfo blk = blkIterator.next(); final INode inode = (INode)blockManager.getBlockCollection(blk); skip++; if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java index 8c4e466..db3400c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java @@ -30,9 +30,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference; import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName; @@ -799,16 +799,16 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> { /** * The list of blocks that need to be removed from blocksMap */ - private final List<Block> toDeleteList; + private final List<BlockInfo> toDeleteList; public BlocksMapUpdateInfo() { - toDeleteList = new ChunkedArrayList<Block>(); + toDeleteList = new ChunkedArrayList<>(); } /** * @return The list of blocks that need to be removed from blocksMap */ - public List<Block> getToDeleteList() { + public List<BlockInfo> getToDeleteList() { return toDeleteList; } @@ -817,12 +817,12 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> { * {@link BlocksMapUpdateInfo#toDeleteList} * @param toDelete the to-be-deleted block */ - public void addDeleteBlock(Block toDelete) { + public void addDeleteBlock(BlockInfo toDelete) { assert toDelete != null : "toDelete is null"; toDeleteList.add(toDelete); } - public void removeDeleteBlock(Block block) { + public void removeDeleteBlock(BlockInfo block) { assert block != null : "block is null"; toDeleteList.remove(block); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index a8ab3ce..3474c09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -1056,8 +1056,8 @@ public class INodeFile extends INodeWithAdditionalFields getDiffs().findEarlierSnapshotBlocks(snapshotId); if(snapshotBlocks == null) return; - List<Block> toDelete = collectedBlocks.getToDeleteList(); - for(Block blk : snapshotBlocks) { + List<BlockInfo> toDelete = collectedBlocks.getToDeleteList(); + for(BlockInfo blk : snapshotBlocks) { if(toDelete.contains(blk)) collectedBlocks.removeDeleteBlock(blk); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 92e9f08..4cf2185 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -244,7 +244,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { } BlockCollection bc = bm.getBlockCollection(blockInfo); INode iNode = (INode) bc; - NumberReplicas numberReplicas= bm.countNodes(block); + NumberReplicas numberReplicas= bm.countNodes(blockInfo); out.println("Block Id: " + blockId); out.println("Block belongs to: "+iNode.getFullPathName()); out.println("No. of Expected Replica: " + bc.getBlockReplication()); @@ -459,7 +459,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { // Get block locations without updating the file access time // and without block access tokens LocatedBlocks blocks = null; - FSNamesystem fsn = namenode.getNamesystem(); + final FSNamesystem fsn = namenode.getNamesystem(); fsn.readLock(); try { blocks = fsn.getBlockLocations(path, 0, fileLen, false, false).blocks; @@ -507,8 +507,10 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { boolean isCorrupt = lBlk.isCorrupt(); String blkName = block.toString(); DatanodeInfo[] locs = lBlk.getLocations(); - NumberReplicas numberReplicas = - namenode.getNamesystem().getBlockManager().countNodes(block.getLocalBlock()); + final BlockManager blockManager = fsn.getBlockManager(); + final BlockInfo storedBlock = blockManager.getStoredBlock( + block.getLocalBlock()); + NumberReplicas numberReplicas = blockManager.countNodes(storedBlock); int liveReplicas = numberReplicas.liveReplicas(); res.totalReplicas += liveReplicas; short targetFileReplication = file.getReplication(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java index 95fc06b..0debb1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; /** SafeMode related operations. */ @InterfaceAudience.Private @@ -49,5 +50,5 @@ public interface SafeMode { public void incrementSafeBlockCount(int replication); /** Decrement number of blocks that reached minimal replication. */ - public void decrementSafeBlockCount(Block b); + public void decrementSafeBlockCount(BlockInfo b); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 23e610f..148135b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -69,9 +69,10 @@ public class BlockManagerTestUtil { final BlockManager bm = namesystem.getBlockManager(); namesystem.readLock(); try { + final BlockInfo storedBlock = bm.getStoredBlock(b); return new int[]{getNumberOfRacks(bm, b), - bm.countNodes(b).liveReplicas(), - bm.neededReplications.contains(b) ? 1 : 0}; + bm.countNodes(storedBlock).liveReplicas(), + bm.neededReplications.contains(storedBlock) ? 1 : 0}; } finally { namesystem.readUnlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index c16ceaa..ca7055c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -435,14 +435,14 @@ public class TestBlockManager { return blockInfo; } - private DatanodeStorageInfo[] scheduleSingleReplication(Block block) { + private DatanodeStorageInfo[] scheduleSingleReplication(BlockInfo block) { // list for priority 1 - List<Block> list_p1 = new ArrayList<Block>(); + List<BlockInfo> list_p1 = new ArrayList<>(); list_p1.add(block); // list of lists for each priority - List<List<Block>> list_all = new ArrayList<List<Block>>(); - list_all.add(new ArrayList<Block>()); // for priority 0 + List<List<BlockInfo>> list_all = new ArrayList<>(); + list_all.add(new ArrayList<BlockInfo>()); // for priority 0 list_all.add(list_p1); // for priority 1 assertEquals("Block not initially pending replication", 0, http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java index c3726f2..1c3f075 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java @@ -166,10 +166,11 @@ public class TestNodeCount { /* threadsafe read of the replication counts for this block */ NumberReplicas countNodes(Block block, FSNamesystem namesystem) { + BlockManager blockManager = namesystem.getBlockManager(); namesystem.readLock(); try { lastBlock = block; - lastNum = namesystem.getBlockManager().countNodes(block); + lastNum = blockManager.countNodes(blockManager.getStoredBlock(block)); return lastNum; } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java index a86b573..2d7bb44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java @@ -117,7 +117,8 @@ public class TestOverReplicatedBlocks { // corrupt one won't be chosen to be excess one // without 4910 the number of live replicas would be 0: block gets lost - assertEquals(1, bm.countNodes(block.getLocalBlock()).liveReplicas()); + assertEquals(1, bm.countNodes( + bm.getStoredBlock(block.getLocalBlock())).liveReplicas()); } } finally { namesystem.writeUnlock(); @@ -219,7 +220,7 @@ public class TestOverReplicatedBlocks { out.close(); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, p); assertEquals("Expected only one live replica for the block", 1, bm - .countNodes(block.getLocalBlock()).liveReplicas()); + .countNodes(bm.getStoredBlock(block.getLocalBlock())).liveReplicas()); } finally { cluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java index c63badc..b9032f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java @@ -53,6 +53,11 @@ public class TestPendingReplication { // Number of datanodes in the cluster private static final int DATANODE_COUNT = 5; + private BlockInfo genBlockInfo(long id, long length, long gs) { + return new BlockInfoContiguous(new Block(id, length, gs), + (short) DATANODE_COUNT); + } + @Test public void testPendingReplication() { PendingReplicationBlocks pendingReplications; @@ -64,7 +69,7 @@ public class TestPendingReplication { // DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10); for (int i = 0; i < storages.length; i++) { - Block block = new Block(i, i, 0); + BlockInfo block = genBlockInfo(i, i, 0); DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i]; System.arraycopy(storages, 0, targets, 0, i); pendingReplications.increment(block, @@ -77,7 +82,7 @@ public class TestPendingReplication { // // remove one item and reinsert it // - Block blk = new Block(8, 8, 0); + BlockInfo blk = genBlockInfo(8, 8, 0); pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica assertEquals("pendingReplications.getNumReplicas ", 7, pendingReplications.getNumReplicas(blk)); @@ -97,7 +102,7 @@ public class TestPendingReplication { // are sane. // for (int i = 0; i < 10; i++) { - Block block = new Block(i, i, 0); + BlockInfo block = genBlockInfo(i, i, 0); int numReplicas = pendingReplications.getNumReplicas(block); assertTrue(numReplicas == i); } @@ -116,7 +121,7 @@ public class TestPendingReplication { } for (int i = 10; i < 15; i++) { - Block block = new Block(i, i, 0); + BlockInfo block = genBlockInfo(i, i, 0); pendingReplications.increment(block, DatanodeStorageInfo.toDatanodeDescriptors( DFSTestUtil.createDatanodeStorageInfos(i))); @@ -185,7 +190,7 @@ public class TestPendingReplication { assertEquals(1, blkManager.pendingReplications.size()); INodeFile fileNode = fsn.getFSDirectory().getINode4Write(file).asFile(); - Block[] blocks = fileNode.getBlocks(); + BlockInfo[] blocks = fileNode.getBlocks(); assertEquals(DATANODE_COUNT - 1, blkManager.pendingReplications.getNumReplicas(blocks[0])); @@ -291,8 +296,8 @@ public class TestPendingReplication { BlockManagerTestUtil.computeAllPendingWork(bm); BlockManagerTestUtil.updateState(bm); assertEquals(bm.getPendingReplicationBlocksCount(), 1L); - assertEquals(bm.pendingReplications.getNumReplicas(block.getBlock() - .getLocalBlock()), 2); + BlockInfo storedBlock = bm.getStoredBlock(block.getBlock().getLocalBlock()); + assertEquals(bm.pendingReplications.getNumReplicas(storedBlock), 2); // 4. delete the file fs.delete(filePath, true); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java index 728934d..1a32892 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java @@ -58,7 +58,9 @@ public class TestRBWBlockInvalidation { private static NumberReplicas countReplicas(final FSNamesystem namesystem, ExtendedBlock block) { - return namesystem.getBlockManager().countNodes(block.getLocalBlock()); + final BlockManager blockManager = namesystem.getBlockManager(); + return blockManager.countNodes(blockManager.getStoredBlock( + block.getLocalBlock())); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index ac6c445..7710430 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -833,6 +833,10 @@ public class TestReplicationPolicy { assertEquals(targets.length, 2); assertTrue(isOnSameRack(targets[0], dataNodes[2])); } + + private BlockInfo genBlockInfo(long id) { + return new BlockInfoContiguous(new Block(id), (short) 3); + } /** * Test for the high priority blocks are processed before the low priority @@ -852,14 +856,14 @@ public class TestReplicationPolicy { .getNamesystem().getBlockManager().neededReplications; for (int i = 0; i < 100; i++) { // Adding the blocks directly to normal priority - neededReplications.add(new Block(random.nextLong()), 2, 0, 3); + neededReplications.add(genBlockInfo(random.nextLong()), 2, 0, 3); } // Lets wait for the replication interval, to start process normal // priority blocks Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL); // Adding the block directly to high priority list - neededReplications.add(new Block(random.nextLong()), 1, 0, 3); + neededReplications.add(genBlockInfo(random.nextLong()), 1, 0, 3); // Lets wait for the replication interval Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL); @@ -882,25 +886,26 @@ public class TestReplicationPolicy { for (int i = 0; i < 5; i++) { // Adding QUEUE_HIGHEST_PRIORITY block - underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3); + underReplicatedBlocks.add(genBlockInfo(random.nextLong()), 1, 0, 3); // Adding QUEUE_VERY_UNDER_REPLICATED block - underReplicatedBlocks.add(new Block(random.nextLong()), 2, 0, 7); + underReplicatedBlocks.add(genBlockInfo(random.nextLong()), 2, 0, 7); // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block - underReplicatedBlocks.add(new Block(random.nextLong()), 6, 0, 6); + underReplicatedBlocks.add(genBlockInfo(random.nextLong()), 6, 0, 6); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(new Block(random.nextLong()), 5, 0, 6); + underReplicatedBlocks.add(genBlockInfo(random.nextLong()), 5, 0, 6); // Adding QUEUE_WITH_CORRUPT_BLOCKS block - underReplicatedBlocks.add(new Block(random.nextLong()), 0, 0, 3); + underReplicatedBlocks.add(genBlockInfo(random.nextLong()), 0, 0, 3); } // Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks // from // QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_UNDER_REPLICATED. - List<List<Block>> chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(6); + List<List<BlockInfo>> chosenBlocks = + underReplicatedBlocks.chooseUnderReplicatedBlocks(6); assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0); // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 4 blocks from @@ -910,7 +915,7 @@ public class TestReplicationPolicy { assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0); // Adding QUEUE_HIGHEST_PRIORITY - underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3); + underReplicatedBlocks.add(genBlockInfo(random.nextLong()), 1, 0, 3); // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from // QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED @@ -928,7 +933,7 @@ public class TestReplicationPolicy { /** asserts the chosen blocks with expected priority blocks */ private void assertTheChosenBlocks( - List<List<Block>> chosenBlocks, int firstPrioritySize, + List<List<BlockInfo>> chosenBlocks, int firstPrioritySize, int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize, int fifthPrioritySize) { assertEquals( @@ -1102,9 +1107,9 @@ public class TestReplicationPolicy { public void testUpdateDoesNotCauseSkippedReplication() { UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks(); - Block block1 = new Block(random.nextLong()); - Block block2 = new Block(random.nextLong()); - Block block3 = new Block(random.nextLong()); + BlockInfo block1 = genBlockInfo(random.nextLong()); + BlockInfo block2 = genBlockInfo(random.nextLong()); + BlockInfo block3 = genBlockInfo(random.nextLong()); // Adding QUEUE_VERY_UNDER_REPLICATED block final int block1CurReplicas = 2; @@ -1118,7 +1123,7 @@ public class TestReplicationPolicy { // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block3, 2, 0, 6); - List<List<Block>> chosenBlocks; + List<List<BlockInfo>> chosenBlocks; // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. @@ -1151,8 +1156,8 @@ public class TestReplicationPolicy { BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; - Block block1 = new Block(random.nextLong()); - Block block2 = new Block(random.nextLong()); + BlockInfo block1 = genBlockInfo(random.nextLong()); + BlockInfo block2 = genBlockInfo(random.nextLong()); // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block1, 0, 1, 1); @@ -1160,7 +1165,7 @@ public class TestReplicationPolicy { // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block2, 0, 1, 1); - List<List<Block>> chosenBlocks; + List<List<BlockInfo>> chosenBlocks; // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. @@ -1197,8 +1202,8 @@ public class TestReplicationPolicy { BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; - Block block1 = new Block(random.nextLong()); - Block block2 = new Block(random.nextLong()); + BlockInfo block1 = genBlockInfo(random.nextLong()); + BlockInfo block2 = genBlockInfo(random.nextLong()); // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block1, 0, 1, 1); @@ -1206,7 +1211,7 @@ public class TestReplicationPolicy { // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block2, 0, 1, 1); - List<List<Block>> chosenBlocks; + List<List<BlockInfo>> chosenBlocks; // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. @@ -1260,8 +1265,8 @@ public class TestReplicationPolicy { BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; - Block block1 = new Block(random.nextLong()); - Block block2 = new Block(random.nextLong()); + BlockInfo block1 = genBlockInfo(random.nextLong()); + BlockInfo block2 = genBlockInfo(random.nextLong()); // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block1, 0, 1, 1); @@ -1269,14 +1274,14 @@ public class TestReplicationPolicy { // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block2, 0, 1, 1); - List<List<Block>> chosenBlocks; + List<List<BlockInfo>> chosenBlocks; // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); - bm.setReplication((short)0, (short)1, "", block1); + bm.setReplication((short)0, (short)1, "", (BlockInfoContiguous) block1); // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java index e87a043..de36e07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java @@ -28,6 +28,10 @@ import static org.junit.Assert.fail; public class TestUnderReplicatedBlockQueues { + private BlockInfo genBlockInfo(long id) { + return new BlockInfoContiguous(new Block(id), (short) 3); + } + /** * Test that adding blocks with different replication counts puts them * into different queues @@ -36,11 +40,11 @@ public class TestUnderReplicatedBlockQueues { @Test public void testBlockPriorities() throws Throwable { UnderReplicatedBlocks queues = new UnderReplicatedBlocks(); - Block block1 = new Block(1); - Block block2 = new Block(2); - Block block_very_under_replicated = new Block(3); - Block block_corrupt = new Block(4); - Block block_corrupt_repl_one = new Block(5); + BlockInfo block1 = genBlockInfo(1); + BlockInfo block2 = genBlockInfo(2); + BlockInfo block_very_under_replicated = genBlockInfo(3); + BlockInfo block_corrupt = genBlockInfo(4); + BlockInfo block_corrupt_repl_one = genBlockInfo(5); //add a block with a single entry assertAdded(queues, block1, 1, 0, 3); @@ -82,7 +86,7 @@ public class TestUnderReplicatedBlockQueues { } private void assertAdded(UnderReplicatedBlocks queues, - Block block, + BlockInfo block, int curReplicas, int decomissionedReplicas, int expectedReplicas) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java index e6bf067..80ad359 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; @@ -81,6 +82,7 @@ public class TestReadOnlySharedStorage { private DatanodeInfo readOnlyDataNode; private Block block; + private BlockInfo storedBlock; private ExtendedBlock extendedBlock; @@ -132,6 +134,7 @@ public class TestReadOnlySharedStorage { LocatedBlock locatedBlock = getLocatedBlock(); extendedBlock = locatedBlock.getBlock(); block = extendedBlock.getLocalBlock(); + storedBlock = blockManager.getStoredBlock(block); assertThat(locatedBlock.getLocations().length, is(1)); normalDataNode = locatedBlock.getLocations()[0]; @@ -188,7 +191,7 @@ public class TestReadOnlySharedStorage { } private void validateNumberReplicas(int expectedReplicas) throws IOException { - NumberReplicas numberReplicas = blockManager.countNodes(block); + NumberReplicas numberReplicas = blockManager.countNodes(storedBlock); assertThat(numberReplicas.liveReplicas(), is(expectedReplicas)); assertThat(numberReplicas.excessReplicas(), is(0)); assertThat(numberReplicas.corruptReplicas(), is(0)); @@ -230,7 +233,7 @@ public class TestReadOnlySharedStorage { cluster.getNameNode(), normalDataNode.getXferAddr()); // The live replica count should now be zero (since the NORMAL replica is offline) - NumberReplicas numberReplicas = blockManager.countNodes(block); + NumberReplicas numberReplicas = blockManager.countNodes(storedBlock); assertThat(numberReplicas.liveReplicas(), is(0)); // The block should be reported as under-replicated @@ -263,7 +266,7 @@ public class TestReadOnlySharedStorage { waitForLocations(1); // However, the corrupt READ_ONLY_SHARED replica should *not* affect the overall corrupt replicas count - NumberReplicas numberReplicas = blockManager.countNodes(block); + NumberReplicas numberReplicas = blockManager.countNodes(storedBlock); assertThat(numberReplicas.corruptReplicas(), is(0)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c0e02ae/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java index 168ebb9..5058110 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; import org.junit.Test; @@ -259,7 +260,9 @@ public class TestProcessCorruptBlocks { } private static NumberReplicas countReplicas(final FSNamesystem namesystem, ExtendedBlock block) { - return namesystem.getBlockManager().countNodes(block.getLocalBlock()); + final BlockManager blockManager = namesystem.getBlockManager(); + return blockManager.countNodes(blockManager.getStoredBlock( + block.getLocalBlock())); } private void corruptBlock(MiniDFSCluster cluster, FileSystem fs, final Path fileName,