HDFS-10801. [SPS]: Protocol buffer changes for sending storage movement commands from NN to DN. Contributed by Rakesh R
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/13a80503 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/13a80503 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/13a80503 Branch: refs/heads/HDFS-10285 Commit: 13a8050350962d939fd808cf2e20604311b81b8e Parents: 07299e2 Author: Rakesh Radhakrishnan <rake...@apache.org> Authored: Tue Oct 11 11:44:06 2016 +0530 Committer: Uma Maheswara Rao G <uma.ganguma...@intel.com> Committed: Mon Oct 17 16:05:26 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 90 ++++++++++++++++++++ .../blockmanagement/DatanodeDescriptor.java | 15 ---- .../server/blockmanagement/DatanodeManager.java | 13 ++- .../hdfs/server/datanode/BPOfferService.java | 8 ++ .../hadoop/hdfs/server/datanode/DataNode.java | 7 ++ .../datanode/StoragePolicySatisfyWorker.java | 22 ++++- .../protocol/BlockStorageMovementCommand.java | 71 ++++++++++++++- .../hdfs/server/protocol/DatanodeProtocol.java | 1 + .../src/main/proto/DatanodeProtocol.proto | 22 +++++ .../namenode/TestStoragePolicySatisfier.java | 86 +++++++++++-------- 10 files changed, 273 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/13a80503/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 78371f5..28c7590 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -48,6 +48,8 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDele import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; @@ -89,6 +91,8 @@ import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.Block import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; @@ -457,6 +461,8 @@ public class PBHelper { return PBHelper.convert(proto.getBlkIdCmd()); case BlockECReconstructionCommand: return PBHelper.convert(proto.getBlkECReconstructionCmd()); + case BlockStorageMovementCommand: + return PBHelper.convert(proto.getBlkStorageMovementCmd()); default: return null; } @@ -591,6 +597,11 @@ public class PBHelper { .setBlkECReconstructionCmd( convert((BlockECReconstructionCommand) datanodeCommand)); break; + case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT: + builder.setCmdType(DatanodeCommandProto.Type.BlockStorageMovementCommand) + .setBlkStorageMovementCmd( + convert((BlockStorageMovementCommand) datanodeCommand)); + break; case DatanodeProtocol.DNA_UNKNOWN: //Not expected default: builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand); @@ -963,4 +974,83 @@ public class PBHelper { DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION, blkECReconstructionInfos); } + + private static BlockStorageMovementCommandProto convert( + BlockStorageMovementCommand blkStorageMovementCmd) { + BlockStorageMovementCommandProto.Builder builder = + BlockStorageMovementCommandProto.newBuilder(); + + builder.setTrackID(blkStorageMovementCmd.getTrackID()); + builder.setBlockPoolId(blkStorageMovementCmd.getBlockPoolId()); + Collection<BlockMovingInfo> blockMovingInfos = blkStorageMovementCmd + .getBlockMovingTasks(); + for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { + builder.addBlockStorageMovement( + convertBlockMovingInfo(blkMovingInfo)); + } + return builder.build(); + } + + private static BlockStorageMovementProto convertBlockMovingInfo( + BlockMovingInfo blkMovingInfo) { + BlockStorageMovementProto.Builder builder = BlockStorageMovementProto + .newBuilder(); + builder.setBlock(PBHelperClient.convert(blkMovingInfo.getBlock())); + + DatanodeInfo[] sourceDnInfos = blkMovingInfo.getSources(); + builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos)); + + DatanodeInfo[] targetDnInfos = blkMovingInfo.getTargets(); + builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos)); + + StorageType[] sourceStorageTypes = blkMovingInfo.getSourceStorageTypes(); + builder.setSourceStorageTypes(convertStorageTypesProto(sourceStorageTypes)); + + StorageType[] targetStorageTypes = blkMovingInfo.getTargetStorageTypes(); + builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes)); + + return builder.build(); + } + + private static DatanodeCommand convert( + BlockStorageMovementCommandProto blkStorageMovementCmdProto) { + Collection<BlockMovingInfo> blockMovingInfos = new ArrayList<>(); + List<BlockStorageMovementProto> blkSPSatisfyList = + blkStorageMovementCmdProto.getBlockStorageMovementList(); + for (BlockStorageMovementProto blkSPSatisfy : blkSPSatisfyList) { + blockMovingInfos.add(convertBlockMovingInfo(blkSPSatisfy)); + } + return new BlockStorageMovementCommand( + DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, + blkStorageMovementCmdProto.getTrackID(), + blkStorageMovementCmdProto.getBlockPoolId(), blockMovingInfos); + } + + private static BlockMovingInfo convertBlockMovingInfo( + BlockStorageMovementProto blockStoragePolicySatisfyProto) { + BlockProto blockProto = blockStoragePolicySatisfyProto.getBlock(); + Block block = PBHelperClient.convert(blockProto); + + DatanodeInfosProto sourceDnInfosProto = blockStoragePolicySatisfyProto + .getSourceDnInfos(); + DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto); + + DatanodeInfosProto targetDnInfosProto = blockStoragePolicySatisfyProto + .getTargetDnInfos(); + DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto); + + StorageTypesProto srcStorageTypesProto = blockStoragePolicySatisfyProto + .getSourceStorageTypes(); + StorageType[] srcStorageTypes = PBHelperClient.convertStorageTypes( + srcStorageTypesProto.getStorageTypesList(), + srcStorageTypesProto.getStorageTypesList().size()); + + StorageTypesProto targetStorageTypesProto = blockStoragePolicySatisfyProto + .getTargetStorageTypes(); + StorageType[] targetStorageTypes = PBHelperClient.convertStorageTypes( + targetStorageTypesProto.getStorageTypesList(), + targetStorageTypesProto.getStorageTypesList().size()); + return new BlockMovingInfo(block, sourceDnInfos, targetDnInfos, + srcStorageTypes, targetStorageTypes); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13a80503/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 47dcd74..8bbc98a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -940,19 +940,4 @@ public class DatanodeDescriptor extends DatanodeInfo { public List<BlockMovingInfo> getBlocksToMoveStorages() { return storageMovementBlocks.poll(); } - - // TODO: we will remove this method once DN side handling integrated. We can - // convert the test to check real block movements instead of this ds. - @VisibleForTesting - public List<BlockMovingInfo> getStorageMovementPendingItems() { - List<BlockMovingInfo> flatList = new ArrayList<>(); - Iterator<List<BlockMovingInfo>> iterator = storageMovementBlocks - .iterator(); - while (iterator.hasNext()) { - List<BlockMovingInfo> next = iterator.next(); - flatList.addAll(next); - } - return flatList; - } } - http://git-wip-us.apache.org/repos/asf/hadoop/blob/13a80503/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 8aa1545..026ad72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1600,10 +1600,17 @@ public class DatanodeManager { nodeinfo.setBalancerBandwidth(0); } - List<BlockMovingInfo> blocksToMoveStorages = nodeinfo + // check pending block storage movement tasks + List<BlockMovingInfo> pendingBlockMovementList = nodeinfo .getBlocksToMoveStorages(); - if (blocksToMoveStorages != null) { - // TODO: create BlockStorageMovementCommand and add here in response. + if (pendingBlockMovementList != null) { + // TODO: trackID is used to track the block movement sends to coordinator + // datanode. Need to implement tracking logic. Temporarily, using a + // constant value -1. + long trackID = -1; + cmds.add(new BlockStorageMovementCommand( + DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, trackID, blockPoolId, + pendingBlockMovementList)); } if (!cmds.isEmpty()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/13a80503/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 00102eb..f5f6738 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -732,6 +732,13 @@ class BPOfferService { ((BlockECReconstructionCommand) cmd).getECTasks(); dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks); break; + case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT: + LOG.info("DatanodeCommand action: DNA_BLOCK_STORAGE_MOVEMENT"); + BlockStorageMovementCommand blkSPSCmd = (BlockStorageMovementCommand) cmd; + dn.getStoragePolicySatisfyWorker().processBlockMovingTasks( + blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(), + blkSPSCmd.getBlockMovingTasks()); + break; default: LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); } @@ -762,6 +769,7 @@ class BPOfferService { case DatanodeProtocol.DNA_CACHE: case DatanodeProtocol.DNA_UNCACHE: case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION: + case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT: LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction()); break; default: http://git-wip-us.apache.org/repos/asf/hadoop/blob/13a80503/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 8f65efe..f099304 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -381,6 +381,7 @@ public class DataNode extends ReconfigurableBase private String dnUserName = null; private BlockRecoveryWorker blockRecoveryWorker; private ErasureCodingWorker ecWorker; + private StoragePolicySatisfyWorker storagePolicySatisfyWorker; private final Tracer tracer; private final TracerConfigurationManager tracerConfigurationManager; private static final int NUM_CORES = Runtime.getRuntime() @@ -1359,6 +1360,8 @@ public class DataNode extends ReconfigurableBase ecWorker = new ErasureCodingWorker(getConf(), this); blockRecoveryWorker = new BlockRecoveryWorker(this); + storagePolicySatisfyWorker = + new StoragePolicySatisfyWorker(getConf(), this); blockPoolManager = new BlockPoolManager(this); blockPoolManager.refreshNamenodes(getConf()); @@ -3471,4 +3474,8 @@ public class DataNode extends ReconfigurableBase void setBlockScanner(BlockScanner blockScanner) { this.blockScanner = blockScanner; } + + StoragePolicySatisfyWorker getStoragePolicySatisfyWorker() { + return storagePolicySatisfyWorker; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13a80503/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index fa408f6..2c99963 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -28,6 +28,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.List; import java.util.concurrent.Callable; @@ -126,8 +127,25 @@ public class StoragePolicySatisfyWorker { return moverThreadPool; } + /** + * Handles the given set of block movement tasks. This will iterate over the + * block movement list and submit each block movement task asynchronously in a + * separate thread. Each task will move the block replica to the target node & + * wait for the completion. + * + * TODO: Presently this function is a blocking call, this has to be refined by + * moving the tracking logic to another tracker thread. HDFS-10884 jira + * addresses the same. + * + * @param trackID + * unique tracking identifier + * @param blockPoolID + * block pool ID + * @param blockMovingInfos + * list of blocks to be moved + */ public void processBlockMovingTasks(long trackID, String blockPoolID, - List<BlockMovingInfo> blockMovingInfos) { + Collection<BlockMovingInfo> blockMovingInfos) { Future<Void> moveCallable = null; for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { assert blkMovingInfo @@ -143,8 +161,6 @@ public class StoragePolicySatisfyWorker { } } - // TODO: Presently this function act as a blocking call, this has to be - // refined by moving the tracking logic to another tracker thread. for (int i = 0; i < moverTaskFutures.size(); i++) { try { moveCallable = moverExecutorCompletionService.take(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/13a80503/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java index c1ab800..7c97f1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.protocol; import java.util.Arrays; +import java.util.Collection; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; @@ -33,12 +34,60 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; * {@link org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker} * service. After the block movement this DataNode sends response back to the * NameNode about the movement status. + * + * The coordinator datanode will use 'trackId' identifier to coordinate the block + * movement of the given set of blocks. TrackId is a unique identifier that + * represents a group of blocks. Namenode will generate this unique value and + * send it to the coordinator datanode along with the + * BlockStorageMovementCommand. Datanode will monitor the completion of the + * block movements that grouped under this trackId and notifies Namenode about + * the completion status. */ public class BlockStorageMovementCommand extends DatanodeCommand { - // TODO: constructor needs to be refined based on the block movement data - // structure. - BlockStorageMovementCommand(int action) { + private final long trackID; + private final String blockPoolId; + private final Collection<BlockMovingInfo> blockMovingTasks; + + /** + * Block storage movement command constructor. + * + * @param action + * protocol specific action + * @param trackID + * unique identifier to monitor the given set of block movements + * @param blockPoolId + * block pool ID + * @param blockMovingInfos + * block to storage info that will be used for movement + */ + public BlockStorageMovementCommand(int action, long trackID, + String blockPoolId, Collection<BlockMovingInfo> blockMovingInfos) { super(action); + this.trackID = trackID; + this.blockPoolId = blockPoolId; + this.blockMovingTasks = blockMovingInfos; + } + + /** + * Returns trackID, which will be used to monitor the block movement assigned + * to this coordinator datanode. + */ + public long getTrackID() { + return trackID; + } + + /** + * Returns block pool ID. + */ + public String getBlockPoolId() { + return blockPoolId; + } + + /** + * Returns the list of blocks to be moved. + */ + public Collection<BlockMovingInfo> getBlockMovingTasks() { + return blockMovingTasks; } /** @@ -47,10 +96,24 @@ public class BlockStorageMovementCommand extends DatanodeCommand { public static class BlockMovingInfo { private Block blk; private DatanodeInfo[] sourceNodes; - private StorageType[] sourceStorageTypes; private DatanodeInfo[] targetNodes; + private StorageType[] sourceStorageTypes; private StorageType[] targetStorageTypes; + /** + * Block to storage info constructor. + * + * @param block + * block + * @param sourceDnInfos + * node that can be the sources of a block move + * @param targetDnInfos + * target datanode info + * @param srcStorageTypes + * type of source storage media + * @param targetStorageTypes + * type of destin storage media + */ public BlockMovingInfo(Block block, DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos, StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/13a80503/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 8c4359f..f8b4934 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -77,6 +77,7 @@ public interface DatanodeProtocol { final static int DNA_CACHE = 9; // cache blocks final static int DNA_UNCACHE = 10; // uncache blocks final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command + final static int DNA_BLOCK_STORAGE_MOVEMENT = 12; // block storage movement command /** * Register Datanode. http://git-wip-us.apache.org/repos/asf/hadoop/blob/13a80503/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 9bae4c3..fc64838 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -61,6 +61,7 @@ message DatanodeCommandProto { NullDatanodeCommand = 7; BlockIdCommand = 8; BlockECReconstructionCommand = 9; + BlockStorageMovementCommand = 10; } required Type cmdType = 1; // Type of the command @@ -75,6 +76,7 @@ message DatanodeCommandProto { optional RegisterCommandProto registerCmd = 7; optional BlockIdCommandProto blkIdCmd = 8; optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9; + optional BlockStorageMovementCommandProto blkStorageMovementCmd = 10; } /** @@ -155,6 +157,26 @@ message BlockECReconstructionCommandProto { repeated BlockECReconstructionInfoProto blockECReconstructioninfo = 1; } + /** + * Block storage movement command + */ +message BlockStorageMovementCommandProto { + required uint64 trackID = 1; + required string blockPoolId = 2; + repeated BlockStorageMovementProto blockStorageMovement = 3; +} + +/** + * Block storage movement information + */ +message BlockStorageMovementProto { + required BlockProto block = 1; + required DatanodeInfosProto sourceDnInfos = 2; + required DatanodeInfosProto targetDnInfos = 3; + required StorageTypesProto sourceStorageTypes = 4; + required StorageTypesProto targetStorageTypes = 5; +} + /** * registration - Information of the datanode registering with the namenode */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/13a80503/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index b61814d..37664b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -29,8 +26,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; -import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Before; import org.junit.Test; @@ -74,9 +70,6 @@ public class TestStoragePolicySatisfier { try { // Change policy to ALL_SSD distributedFS.setStoragePolicy(new Path(file), "COLD"); - Set<DatanodeDescriptor> previousNodes = - hdfsCluster.getNameNode().getNamesystem().getBlockManager() - .getDatanodeManager().getDatanodes(); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -91,8 +84,8 @@ public class TestStoragePolicySatisfier { hdfsCluster.triggerHeartbeats(); // Wait till namenode notified about the block location details - waitExpectedStorageType(StorageType.ARCHIVE, distributedFS, previousNodes, - 6, 30000); + waitExpectedStorageType(file, StorageType.ARCHIVE, distributedFS, 3, + 30000); } finally { hdfsCluster.shutdown(); } @@ -104,9 +97,6 @@ public class TestStoragePolicySatisfier { try { // Change policy to ALL_SSD distributedFS.setStoragePolicy(new Path(file), "ALL_SSD"); - Set<DatanodeDescriptor> previousNodes = - hdfsCluster.getNameNode().getNamesystem().getBlockManager() - .getDatanodeManager().getDatanodes(); FSNamesystem namesystem = hdfsCluster.getNamesystem(); INode inode = namesystem.getFSDirectory().getINode(file); @@ -123,8 +113,34 @@ public class TestStoragePolicySatisfier { hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier Identified that block to move to SSD // areas - waitExpectedStorageType(StorageType.SSD, distributedFS, previousNodes, 6, - 30000); + waitExpectedStorageType(file, StorageType.SSD, distributedFS, 3, 30000); + } finally { + hdfsCluster.shutdown(); + } + } + + @Test(timeout = 300000) + public void testWhenStoragePolicySetToONESSD() + throws Exception { + try { + // Change policy to ONE_SSD + distributedFS.setStoragePolicy(new Path(file), "ONE_SSD"); + FSNamesystem namesystem = hdfsCluster.getNamesystem(); + INode inode = namesystem.getFSDirectory().getINode(file); + + StorageType[][] newtypes = + new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; + + // Making sure SDD based nodes added to cluster. Adding SSD based + // datanodes. + startAdditionalDNs(config, 1, numOfDatanodes, newtypes, + storagesPerDatanode, capacity, hdfsCluster); + namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + hdfsCluster.triggerHeartbeats(); + // Wait till StorgePolicySatisfier Identified that block to move to SSD + // areas + waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000); + waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000); } finally { hdfsCluster.shutdown(); } @@ -174,35 +190,31 @@ public class TestStoragePolicySatisfier { return cluster; } - // TODO: this assertion can be changed to end to end based assertion later - // when DN side processing work integrated to this work. - private void waitExpectedStorageType(final StorageType expectedStorageType, - final DistributedFileSystem dfs, - final Set<DatanodeDescriptor> previousNodes, int expectedArchiveCount, - int timeout) throws Exception { + // Check whether the Block movement has been successfully completed to satisfy + // the storage policy for the given file. + private void waitExpectedStorageType(final String fileName, + final StorageType expectedStorageType, final DistributedFileSystem dfs, + int expectedStorageCount, int timeout) throws Exception { GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { - Iterator<DatanodeDescriptor> iterator = previousNodes.iterator(); - int archiveCount = 0; - while (iterator.hasNext()) { - DatanodeDescriptor dn = iterator.next(); - List<BlockMovingInfo> pendingItemsToMove = - dn.getStorageMovementPendingItems(); - for (BlockMovingInfo blkInfoToMoveStorage : pendingItemsToMove) { - StorageType[] targetStorageTypes = - blkInfoToMoveStorage.getTargetStorageTypes(); - for (StorageType storageType : targetStorageTypes) { - if (storageType == expectedStorageType) { - archiveCount++; - } - } + LocatedBlock lb = null; + try { + lb = dfs.getClient().getLocatedBlocks(fileName, 0).get(0); + } catch (IOException e) { + LOG.error("Exception while getting located blocks", e); + return false; + } + int actualStorageCount = 0; + for (StorageType storageType : lb.getStorageTypes()) { + if (expectedStorageType == storageType) { + actualStorageCount++; } } LOG.info( expectedStorageType + " replica count, expected={} and actual={}", - expectedArchiveCount, archiveCount); - return expectedArchiveCount == archiveCount; + expectedStorageType, actualStorageCount); + return expectedStorageCount == actualStorageCount; } }, 100, timeout); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org