HDFS-13110: [SPS]: Reduce the number of APIs in NamenodeProtocol used by external satisfier. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/79925028 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/79925028 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/79925028 Branch: refs/heads/HDFS-10285 Commit: 7992502873e3730e15d9c6487c2922a5d22666e0 Parents: 87e125a Author: Rakesh Radhakrishnan <rake...@apache.org> Authored: Fri Feb 16 17:01:38 2018 +0530 Committer: Rakesh Radhakrishnan <rake...@apache.org> Committed: Tue Jul 31 12:10:48 2018 +0530 ---------------------------------------------------------------------- .../NamenodeProtocolServerSideTranslatorPB.java | 46 +---- .../NamenodeProtocolTranslatorPB.java | 42 +---- .../hdfs/server/namenode/FSTreeTraverser.java | 2 +- .../hdfs/server/namenode/NameNodeRpcServer.java | 32 +--- .../server/namenode/ReencryptionHandler.java | 2 +- .../sps/BlockStorageMovementAttemptedItems.java | 42 +++-- .../sps/BlockStorageMovementNeeded.java | 119 +++++++------ .../hdfs/server/namenode/sps/Context.java | 55 +++--- .../hdfs/server/namenode/sps/FileCollector.java | 48 +++++ .../server/namenode/sps/FileIdCollector.java | 43 ----- .../namenode/sps/IntraSPSNameNodeContext.java | 39 ++--- .../sps/IntraSPSNameNodeFileIdCollector.java | 23 +-- .../hdfs/server/namenode/sps/ItemInfo.java | 39 +++-- .../hdfs/server/namenode/sps/SPSService.java | 32 ++-- .../namenode/sps/StoragePolicySatisfier.java | 129 +++++++++----- .../sps/StoragePolicySatisfyManager.java | 6 +- .../hdfs/server/protocol/NamenodeProtocol.java | 24 +-- .../sps/ExternalSPSBlockMoveTaskHandler.java | 4 +- .../hdfs/server/sps/ExternalSPSContext.java | 60 +++---- .../server/sps/ExternalSPSFileIDCollector.java | 174 ------------------- .../sps/ExternalSPSFilePathCollector.java | 172 ++++++++++++++++++ .../sps/ExternalStoragePolicySatisfier.java | 7 +- .../src/main/proto/NamenodeProtocol.proto | 27 +-- .../TestBlockStorageMovementAttemptedItems.java | 27 ++- .../sps/TestStoragePolicySatisfier.java | 52 +++--- ...stStoragePolicySatisfierWithStripedFile.java | 15 +- .../sps/TestExternalStoragePolicySatisfier.java | 148 +++++++++++----- 27 files changed, 701 insertions(+), 708 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java index 25eafdf..ed176cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java @@ -35,16 +35,12 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksReq import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto; @@ -267,15 +263,15 @@ public class NamenodeProtocolServerSideTranslatorPB implements } @Override - public GetNextSPSPathIdResponseProto getNextSPSPathId( - RpcController controller, GetNextSPSPathIdRequestProto request) + public GetNextSPSPathResponseProto getNextSPSPath( + RpcController controller, GetNextSPSPathRequestProto request) throws ServiceException { try { - Long nextSPSPathId = impl.getNextSPSPathId(); - if (nextSPSPathId == null) { - return GetNextSPSPathIdResponseProto.newBuilder().build(); + String nextSPSPath = impl.getNextSPSPath(); + if (nextSPSPath == null) { + return GetNextSPSPathResponseProto.newBuilder().build(); } - return GetNextSPSPathIdResponseProto.newBuilder().setFileId(nextSPSPathId) + return GetNextSPSPathResponseProto.newBuilder().setSpsPath(nextSPSPath) .build(); } catch (IOException e) { throw new ServiceException(e); @@ -283,17 +279,6 @@ public class NamenodeProtocolServerSideTranslatorPB implements } @Override - public GetFilePathResponseProto getFilePath(RpcController controller, - GetFilePathRequestProto request) throws ServiceException { - try { - return GetFilePathResponseProto.newBuilder() - .setSrcPath(impl.getFilePath(request.getFileId())).build(); - } catch (IOException e) { - throw new ServiceException(e); - } - } - - @Override public CheckDNSpaceResponseProto checkDNSpaceForScheduling( RpcController controller, CheckDNSpaceRequestProto request) throws ServiceException { @@ -309,19 +294,4 @@ public class NamenodeProtocolServerSideTranslatorPB implements throw new ServiceException(e); } } - - @Override - public HasLowRedundancyBlocksResponseProto hasLowRedundancyBlocks( - RpcController controller, HasLowRedundancyBlocksRequestProto request) - throws ServiceException { - try { - return HasLowRedundancyBlocksResponseProto.newBuilder() - .setHasLowRedundancyBlocks( - impl.hasLowRedundancyBlocks(request.getInodeId())) - .build(); - } catch (IOException e) { - throw new ServiceException(e); - } - } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java index 8bff499..d2e97a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -34,12 +34,10 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeys import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto; @@ -271,24 +269,13 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol, } @Override - public Long getNextSPSPathId() throws IOException { - GetNextSPSPathIdRequestProto req = - GetNextSPSPathIdRequestProto.newBuilder().build(); + public String getNextSPSPath() throws IOException { + GetNextSPSPathRequestProto req = + GetNextSPSPathRequestProto.newBuilder().build(); try { - GetNextSPSPathIdResponseProto nextSPSPathId = - rpcProxy.getNextSPSPathId(NULL_CONTROLLER, req); - return nextSPSPathId.hasFileId() ? nextSPSPathId.getFileId() : null; - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - @Override - public String getFilePath(Long inodeId) throws IOException { - GetFilePathRequestProto req = - GetFilePathRequestProto.newBuilder().setFileId(inodeId).build(); - try { - return rpcProxy.getFilePath(NULL_CONTROLLER, req).getSrcPath(); + GetNextSPSPathResponseProto nextSPSPath = + rpcProxy.getNextSPSPath(NULL_CONTROLLER, req); + return nextSPSPath.hasSpsPath() ? nextSPSPath.getSpsPath() : null; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -308,17 +295,4 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol, throw ProtobufHelper.getRemoteException(e); } } - - @Override - public boolean hasLowRedundancyBlocks(long inodeId) throws IOException { - HasLowRedundancyBlocksRequestProto req = HasLowRedundancyBlocksRequestProto - .newBuilder().setInodeId(inodeId).build(); - try { - return rpcProxy.hasLowRedundancyBlocks(NULL_CONTROLLER, req) - .getHasLowRedundancyBlocks(); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java index a7d633f..2acbda4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java @@ -310,7 +310,7 @@ public abstract class FSTreeTraverser { * @throws IOException * @throws InterruptedException */ - protected abstract void submitCurrentBatch(long startId) + protected abstract void submitCurrentBatch(Long startId) throws IOException, InterruptedException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 97f38c7..6fe38d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -2561,20 +2561,9 @@ public class NameNodeRpcServer implements NamenodeProtocols { } @Override - public String getFilePath(Long inodeId) throws IOException { + public String getNextSPSPath() throws IOException { checkNNStartup(); - String operationName = "getFilePath"; - namesystem.checkSuperuserPrivilege(operationName); - if (nn.isStandbyState()) { - throw new StandbyException("Not supported by Standby Namenode."); - } - return namesystem.getFilePath(inodeId); - } - - @Override - public Long getNextSPSPathId() throws IOException { - checkNNStartup(); - String operationName = "getNextSPSPathId"; + String operationName = "getNextSPSPath"; namesystem.checkSuperuserPrivilege(operationName); if (nn.isStandbyState()) { throw new StandbyException("Not supported by Standby Namenode."); @@ -2588,7 +2577,11 @@ public class NameNodeRpcServer implements NamenodeProtocols { + " inside namenode, so external SPS is not allowed to fetch" + " the path Ids"); } - return namesystem.getBlockManager().getSPSManager().getNextPathId(); + Long pathId = namesystem.getBlockManager().getSPSManager().getNextPathId(); + if (pathId == null) { + return null; + } + return namesystem.getFilePath(pathId); } @Override @@ -2603,15 +2596,4 @@ public class NameNodeRpcServer implements NamenodeProtocols { return namesystem.getBlockManager().getDatanodeManager() .verifyTargetDatanodeHasSpaceForScheduling(dn, type, estimatedSize); } - - @Override - public boolean hasLowRedundancyBlocks(long inodeId) throws IOException { - checkNNStartup(); - String operationName = "hasLowRedundancyBlocks"; - namesystem.checkSuperuserPrivilege(operationName); - if (nn.isStandbyState()) { - throw new StandbyException("Not supported by Standby Namenode."); - } - return namesystem.getBlockManager().hasLowRedundancyBlocks(inodeId); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java index feacd74..c8c8d68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java @@ -702,7 +702,7 @@ public class ReencryptionHandler implements Runnable { * @throws InterruptedException */ @Override - protected void submitCurrentBatch(final long zoneId) throws IOException, + protected void submitCurrentBatch(final Long zoneId) throws IOException, InterruptedException { if (currentBatch.isEmpty()) { return; http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java index ea7a093..d2f0bb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java @@ -45,8 +45,13 @@ import com.google.common.annotations.VisibleForTesting; * entries from tracking. If there is no DN reports about movement attempt * finished for a longer time period, then such items will retries automatically * after timeout. The default timeout would be 5 minutes. + * + * @param <T> + * is identifier of inode or full path name of inode. Internal sps will + * use the file inodeId for the block movement. External sps will use + * file string path representation for the block movement. */ -public class BlockStorageMovementAttemptedItems{ +public class BlockStorageMovementAttemptedItems<T> { private static final Logger LOG = LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class); @@ -54,7 +59,7 @@ public class BlockStorageMovementAttemptedItems{ * A map holds the items which are already taken for blocks movements * processing and sent to DNs. */ - private final List<AttemptedItemInfo> storageMovementAttemptedItems; + private final List<AttemptedItemInfo<T>> storageMovementAttemptedItems; private final List<Block> movementFinishedBlocks; private volatile boolean monitorRunning = true; private Daemon timerThread = null; @@ -70,11 +75,11 @@ public class BlockStorageMovementAttemptedItems{ // a request is timed out. // private long minCheckTimeout = 1 * 60 * 1000; // minimum value - private BlockStorageMovementNeeded blockStorageMovementNeeded; - private final SPSService service; + private BlockStorageMovementNeeded<T> blockStorageMovementNeeded; + private final SPSService<T> service; - public BlockStorageMovementAttemptedItems(SPSService service, - BlockStorageMovementNeeded unsatisfiedStorageMovementFiles, + public BlockStorageMovementAttemptedItems(SPSService<T> service, + BlockStorageMovementNeeded<T> unsatisfiedStorageMovementFiles, BlockMovementListener blockMovementListener) { this.service = service; long recheckTimeout = this.service.getConf().getLong( @@ -100,7 +105,7 @@ public class BlockStorageMovementAttemptedItems{ * @param itemInfo * - tracking info */ - public void add(AttemptedItemInfo itemInfo) { + public void add(AttemptedItemInfo<T> itemInfo) { synchronized (storageMovementAttemptedItems) { storageMovementAttemptedItems.add(itemInfo); } @@ -190,25 +195,24 @@ public class BlockStorageMovementAttemptedItems{ @VisibleForTesting void blocksStorageMovementUnReportedItemsCheck() { synchronized (storageMovementAttemptedItems) { - Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems + Iterator<AttemptedItemInfo<T>> iter = storageMovementAttemptedItems .iterator(); long now = monotonicNow(); while (iter.hasNext()) { - AttemptedItemInfo itemInfo = iter.next(); + AttemptedItemInfo<T> itemInfo = iter.next(); if (now > itemInfo.getLastAttemptedOrReportedTime() + selfRetryTimeout) { - Long blockCollectionID = itemInfo.getFileId(); + T file = itemInfo.getFile(); synchronized (movementFinishedBlocks) { - ItemInfo candidate = new ItemInfo(itemInfo.getStartId(), - blockCollectionID, itemInfo.getRetryCount() + 1); + ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(), + file, itemInfo.getRetryCount() + 1); blockStorageMovementNeeded.add(candidate); iter.remove(); LOG.info("TrackID: {} becomes timed out and moved to needed " - + "retries queue for next iteration.", blockCollectionID); + + "retries queue for next iteration.", file); } } } - } } @@ -219,17 +223,17 @@ public class BlockStorageMovementAttemptedItems{ while (finishedBlksIter.hasNext()) { Block blk = finishedBlksIter.next(); synchronized (storageMovementAttemptedItems) { - Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems - .iterator(); + Iterator<AttemptedItemInfo<T>> iterator = + storageMovementAttemptedItems.iterator(); while (iterator.hasNext()) { - AttemptedItemInfo attemptedItemInfo = iterator.next(); + AttemptedItemInfo<T> attemptedItemInfo = iterator.next(); attemptedItemInfo.getBlocks().remove(blk); if (attemptedItemInfo.getBlocks().isEmpty()) { // TODO: try add this at front of the Queue, so that this element // gets the chance first and can be cleaned from queue quickly as // all movements already done. - blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo - .getStartId(), attemptedItemInfo.getFileId(), + blockStorageMovementNeeded.add(new ItemInfo<T>(attemptedItemInfo + .getStartPath(), attemptedItemInfo.getFile(), attemptedItemInfo.getRetryCount() + 1)); iterator.remove(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java index c683a63..a194876 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java @@ -43,31 +43,36 @@ import com.google.common.annotations.VisibleForTesting; * schedule the block collection IDs for movement. It track the info of * scheduled items and remove the SPS xAttr from the file/Directory once * movement is success. + * + * @param <T> + * is identifier of inode or full path name of inode. Internal sps will + * use the file inodeId for the block movement. External sps will use + * file string path representation for the block movement. */ @InterfaceAudience.Private -public class BlockStorageMovementNeeded { +public class BlockStorageMovementNeeded<T> { public static final Logger LOG = LoggerFactory.getLogger(BlockStorageMovementNeeded.class); - private final Queue<ItemInfo> storageMovementNeeded = - new LinkedList<ItemInfo>(); + private final Queue<ItemInfo<T>> storageMovementNeeded = + new LinkedList<ItemInfo<T>>(); /** - * Map of startId and number of child's. Number of child's indicate the + * Map of startPath and number of child's. Number of child's indicate the * number of files pending to satisfy the policy. */ - private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory = - new HashMap<Long, DirPendingWorkInfo>(); + private final Map<T, DirPendingWorkInfo> pendingWorkForDirectory = + new HashMap<>(); - private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus = + private final Map<T, StoragePolicySatisfyPathStatusInfo> spsStatus = new ConcurrentHashMap<>(); - private final Context ctxt; + private final Context<T> ctxt; private Daemon pathIdCollector; - private FileIdCollector fileIDCollector; + private FileCollector<T> fileCollector; private SPSPathIdProcessor pathIDProcessor; @@ -75,10 +80,10 @@ public class BlockStorageMovementNeeded { // NOT_AVAILABLE. private static long statusClearanceElapsedTimeMs = 300000; - public BlockStorageMovementNeeded(Context context, - FileIdCollector fileIDCollector) { + public BlockStorageMovementNeeded(Context<T> context, + FileCollector<T> fileCollector) { this.ctxt = context; - this.fileIDCollector = fileIDCollector; + this.fileCollector = fileCollector; pathIDProcessor = new SPSPathIdProcessor(); } @@ -89,8 +94,8 @@ public class BlockStorageMovementNeeded { * @param trackInfo * - track info for satisfy the policy */ - public synchronized void add(ItemInfo trackInfo) { - spsStatus.put(trackInfo.getStartId(), + public synchronized void add(ItemInfo<T> trackInfo) { + spsStatus.put(trackInfo.getFile(), new StoragePolicySatisfyPathStatusInfo( StoragePolicySatisfyPathStatus.IN_PROGRESS)); storageMovementNeeded.add(trackInfo); @@ -100,8 +105,8 @@ public class BlockStorageMovementNeeded { * Add the itemInfo list to tracking list for which storage movement expected * if necessary. * - * @param startId - * - start id + * @param startPath + * - start path * @param itemInfoList * - List of child in the directory * @param scanCompleted @@ -109,10 +114,10 @@ public class BlockStorageMovementNeeded { * scan. */ @VisibleForTesting - public synchronized void addAll(long startId, List<ItemInfo> itemInfoList, + public synchronized void addAll(T startPath, List<ItemInfo<T>> itemInfoList, boolean scanCompleted) { storageMovementNeeded.addAll(itemInfoList); - updatePendingDirScanStats(startId, itemInfoList.size(), scanCompleted); + updatePendingDirScanStats(startPath, itemInfoList.size(), scanCompleted); } /** @@ -126,22 +131,22 @@ public class BlockStorageMovementNeeded { * elements to scan. */ @VisibleForTesting - public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) { + public synchronized void add(ItemInfo<T> itemInfo, boolean scanCompleted) { storageMovementNeeded.add(itemInfo); // This represents sps start id is file, so no need to update pending dir // stats. - if (itemInfo.getStartId() == itemInfo.getFileId()) { + if (itemInfo.getStartPath() == itemInfo.getFile()) { return; } - updatePendingDirScanStats(itemInfo.getStartId(), 1, scanCompleted); + updatePendingDirScanStats(itemInfo.getFile(), 1, scanCompleted); } - private void updatePendingDirScanStats(long startId, int numScannedFiles, + private void updatePendingDirScanStats(T startPath, int numScannedFiles, boolean scanCompleted) { - DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId); + DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startPath); if (pendingWork == null) { pendingWork = new DirPendingWorkInfo(); - pendingWorkForDirectory.put(startId, pendingWork); + pendingWorkForDirectory.put(startPath, pendingWork); } pendingWork.addPendingWorkCount(numScannedFiles); if (scanCompleted) { @@ -150,12 +155,12 @@ public class BlockStorageMovementNeeded { } /** - * Gets the block collection id for which storage movements check necessary + * Gets the satisfier files for which block storage movements check necessary * and make the movement if required. * - * @return block collection ID + * @return satisfier files */ - public synchronized ItemInfo get() { + public synchronized ItemInfo<T> get() { return storageMovementNeeded.poll(); } @@ -176,12 +181,12 @@ public class BlockStorageMovementNeeded { * Decrease the pending child count for directory once one file blocks moved * successfully. Remove the SPS xAttr if pending child count is zero. */ - public synchronized void removeItemTrackInfo(ItemInfo trackInfo, + public synchronized void removeItemTrackInfo(ItemInfo<T> trackInfo, boolean isSuccess) throws IOException { if (trackInfo.isDir()) { // If track is part of some start inode then reduce the pending // directory work count. - long startId = trackInfo.getStartId(); + T startId = trackInfo.getStartPath(); if (!ctxt.isFileExist(startId)) { // directory deleted just remove it. this.pendingWorkForDirectory.remove(startId); @@ -202,17 +207,17 @@ public class BlockStorageMovementNeeded { } else { // Remove xAttr if trackID doesn't exist in // storageMovementAttemptedItems or file policy satisfied. - ctxt.removeSPSHint(trackInfo.getFileId()); - updateStatus(trackInfo.getStartId(), isSuccess); + ctxt.removeSPSHint(trackInfo.getFile()); + updateStatus(trackInfo.getFile(), isSuccess); } } - public synchronized void clearQueue(long trackId) { + public synchronized void clearQueue(T trackId) { ctxt.removeSPSPathId(trackId); - Iterator<ItemInfo> iterator = storageMovementNeeded.iterator(); + Iterator<ItemInfo<T>> iterator = storageMovementNeeded.iterator(); while (iterator.hasNext()) { - ItemInfo next = iterator.next(); - if (next.getStartId() == trackId) { + ItemInfo<T> next = iterator.next(); + if (next.getFile() == trackId) { iterator.remove(); } } @@ -222,7 +227,7 @@ public class BlockStorageMovementNeeded { /** * Mark inode status as SUCCESS in map. */ - private void updateStatus(long startId, boolean isSuccess){ + private void updateStatus(T startId, boolean isSuccess){ StoragePolicySatisfyPathStatusInfo spsStatusInfo = spsStatus.get(startId); if (spsStatusInfo == null) { @@ -244,8 +249,8 @@ public class BlockStorageMovementNeeded { */ public synchronized void clearQueuesWithNotification() { // Remove xAttr from directories - Long trackId; - while ((trackId = ctxt.getNextSPSPathId()) != null) { + T trackId; + while ((trackId = ctxt.getNextSPSPath()) != null) { try { // Remove xAttr for file ctxt.removeSPSHint(trackId); @@ -256,17 +261,17 @@ public class BlockStorageMovementNeeded { // File's directly added to storageMovementNeeded, So try to remove // xAttr for file - ItemInfo itemInfo; + ItemInfo<T> itemInfo; while ((itemInfo = get()) != null) { try { // Remove xAttr for file if (!itemInfo.isDir()) { - ctxt.removeSPSHint(itemInfo.getFileId()); + ctxt.removeSPSHint(itemInfo.getFile()); } } catch (IOException ie) { LOG.warn( "Failed to remove SPS xattr for track id " - + itemInfo.getFileId(), ie); + + itemInfo.getFile(), ie); } } this.clearAll(); @@ -282,29 +287,29 @@ public class BlockStorageMovementNeeded { public void run() { LOG.info("Starting SPSPathIdProcessor!."); long lastStatusCleanTime = 0; - Long startINodeId = null; + T startINode = null; while (ctxt.isRunning()) { try { if (!ctxt.isInSafeMode()) { - if (startINodeId == null) { - startINodeId = ctxt.getNextSPSPathId(); + if (startINode == null) { + startINode = ctxt.getNextSPSPath(); } // else same id will be retried - if (startINodeId == null) { + if (startINode == null) { // Waiting for SPS path Thread.sleep(3000); } else { - spsStatus.put(startINodeId, + spsStatus.put(startINode, new StoragePolicySatisfyPathStatusInfo( StoragePolicySatisfyPathStatus.IN_PROGRESS)); - fileIDCollector.scanAndCollectFileIds(startINodeId); + fileCollector.scanAndCollectFiles(startINode); // check if directory was empty and no child added to queue DirPendingWorkInfo dirPendingWorkInfo = - pendingWorkForDirectory.get(startINodeId); + pendingWorkForDirectory.get(startINode); if (dirPendingWorkInfo != null && dirPendingWorkInfo.isDirWorkDone()) { - ctxt.removeSPSHint(startINodeId); - pendingWorkForDirectory.remove(startINodeId); - updateStatus(startINodeId, true); + ctxt.removeSPSHint(startINode); + pendingWorkForDirectory.remove(startINode); + updateStatus(startINode, true); } } //Clear the SPS status if status is in SUCCESS more than 5 min. @@ -313,7 +318,7 @@ public class BlockStorageMovementNeeded { lastStatusCleanTime = Time.monotonicNow(); cleanSPSStatus(); } - startINodeId = null; // Current inode id successfully scanned. + startINode = null; // Current inode successfully scanned. } } catch (Throwable t) { String reClass = t.getClass().getName(); @@ -334,9 +339,9 @@ public class BlockStorageMovementNeeded { } private synchronized void cleanSPSStatus() { - for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it = - spsStatus.entrySet().iterator(); it.hasNext();) { - Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next(); + for (Iterator<Entry<T, StoragePolicySatisfyPathStatusInfo>> it = spsStatus + .entrySet().iterator(); it.hasNext();) { + Entry<T, StoragePolicySatisfyPathStatusInfo> entry = it.next(); if (entry.getValue().canRemove()) { it.remove(); } @@ -472,8 +477,8 @@ public class BlockStorageMovementNeeded { return statusClearanceElapsedTimeMs; } - public void markScanCompletedForDir(Long inodeId) { - DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inodeId); + public void markScanCompletedForDir(T inode) { + DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inode); if (pendingWork != null) { pendingWork.markScanCompleted(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java index ff4ad6b..84a969d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java @@ -33,11 +33,16 @@ import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.AccessControlException; /** - * An interface for the communication between NameNode and SPS module. + * An interface for the communication between SPS and Namenode module. + * + * @param <T> + * is identifier of inode or full path name of inode. Internal sps will + * use the file inodeId for the block movement. External sps will use + * file string path representation for the block movement. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface Context { +public interface Context<T> { /** * Returns true if the SPS is running, false otherwise. @@ -72,13 +77,13 @@ public interface Context { NetworkTopology getNetworkTopology(); /** - * Returns true if the give Inode exists in the Namespace. + * Returns true if the give file exists in the Namespace. * - * @param inodeId - * - Inode ID - * @return true if Inode exists, false otherwise. + * @param filePath + * - file info + * @return true if the given file exists, false otherwise. */ - boolean isFileExist(long inodeId); + boolean isFileExist(T filePath); /** * Gets the storage policy details for the given policy ID. @@ -97,11 +102,11 @@ public interface Context { /** * Remove the hint which was added to track SPS call. * - * @param inodeId - * - Inode ID + * @param spsPath + * - user invoked satisfier path * @throws IOException */ - void removeSPSHint(long inodeId) throws IOException; + void removeSPSHint(T spsPath) throws IOException; /** * Gets the number of live datanodes in the cluster. @@ -113,11 +118,11 @@ public interface Context { /** * Get the file info for a specific file. * - * @param inodeID - * inode identifier + * @param file + * file path * @return file status metadata information */ - HdfsFileStatus getFileInfo(long inodeID) throws IOException; + HdfsFileStatus getFileInfo(T file) throws IOException; /** * Returns all the live datanodes and its storage details. @@ -128,15 +133,6 @@ public interface Context { throws IOException; /** - * Returns true if the given inode file has low redundancy blocks. - * - * @param inodeID - * inode identifier - * @return true if block collection has low redundancy blocks - */ - boolean hasLowRedundancyBlocks(long inodeID); - - /** * Checks whether the given datanode has sufficient space to occupy the given * blockSize data. * @@ -153,26 +149,17 @@ public interface Context { long blockSize); /** - * @return next SPS path id to process. + * @return next SPS path info to process. */ - Long getNextSPSPathId(); + T getNextSPSPath(); /** * Removes the SPS path id. */ - void removeSPSPathId(long pathId); + void removeSPSPathId(T pathId); /** * Removes all SPS path ids. */ void removeAllSPSPathIds(); - - /** - * Gets the file path for a given inode id. - * - * @param inodeId - * - path inode id. - */ - String getFilePath(Long inodeId); - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java new file mode 100644 index 0000000..dceb5fa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode.sps; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * An interface for scanning the directory recursively and collect files + * under the given directory. + * + * @param <T> + * is identifier of inode or full path name of inode. Internal sps will + * use the file inodeId for the block movement. External sps will use + * file string path representation for the block movement. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface FileCollector<T> { + + /** + * This method can be used to scan and collects the files under that + * directory and adds to the given BlockStorageMovementNeeded. + * + * @param filePath + * - file path + */ + void scanAndCollectFiles(T filePath) + throws IOException, InterruptedException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java deleted file mode 100644 index 7cf77f0..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.namenode.sps; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * An interface for scanning the directory recursively and collect file ids - * under the given directory. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public interface FileIdCollector { - - /** - * Scans the given inode directory and collects the file ids under that - * directory and adds to the given BlockStorageMovementNeeded. - * - * @param inodeID - * - The directory ID - */ - void scanAndCollectFileIds(Long inodeId) - throws IOException, InterruptedException; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java index 495d1c4..f6b6d95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java @@ -47,17 +47,17 @@ import org.slf4j.LoggerFactory; * movements to satisfy the storage policy. */ @InterfaceAudience.Private -public class IntraSPSNameNodeContext implements Context { +public class IntraSPSNameNodeContext implements Context<Long> { private static final Logger LOG = LoggerFactory .getLogger(IntraSPSNameNodeContext.class); private final Namesystem namesystem; private final BlockManager blockManager; - private SPSService service; + private SPSService<Long> service; public IntraSPSNameNodeContext(Namesystem namesystem, - BlockManager blockManager, SPSService service) { + BlockManager blockManager, SPSService<Long> service) { this.namesystem = namesystem; this.blockManager = blockManager; this.service = service; @@ -68,20 +68,18 @@ public class IntraSPSNameNodeContext implements Context { return blockManager.getDatanodeManager().getNumLiveDataNodes(); } + /** + * @return object containing information regarding the file or null if file + * not found. + */ @Override - public HdfsFileStatus getFileInfo(long inodeID) throws IOException { + public HdfsFileStatus getFileInfo(Long inodeID) throws IOException { String filePath = namesystem.getFilePath(inodeID); if (StringUtils.isBlank(filePath)) { LOG.debug("File with inodeID:{} doesn't exists!", inodeID); return null; } - HdfsFileStatus fileInfo = null; - try { - fileInfo = namesystem.getFileInfo(filePath, true, true); - } catch (IOException e) { - LOG.debug("File path:{} doesn't exists!", filePath); - } - return fileInfo; + return namesystem.getFileInfo(filePath, true, true); } @Override @@ -97,17 +95,12 @@ public class IntraSPSNameNodeContext implements Context { } @Override - public boolean hasLowRedundancyBlocks(long inodeId) { - return blockManager.hasLowRedundancyBlocks(inodeId); - } - - @Override - public boolean isFileExist(long inodeId) { + public boolean isFileExist(Long inodeId) { return namesystem.getFSDirectory().getInode(inodeId) != null; } @Override - public void removeSPSHint(long inodeId) throws IOException { + public void removeSPSHint(Long inodeId) throws IOException { this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY); } @@ -177,12 +170,12 @@ public class IntraSPSNameNodeContext implements Context { } @Override - public Long getNextSPSPathId() { + public Long getNextSPSPath() { return blockManager.getSPSManager().getNextPathId(); } @Override - public void removeSPSPathId(long trackId) { + public void removeSPSPathId(Long trackId) { blockManager.getSPSManager().removePathId(trackId); } @@ -190,10 +183,4 @@ public class IntraSPSNameNodeContext implements Context { public void removeAllSPSPathIds() { blockManager.getSPSManager().removeAllPathIds(); } - - @Override - public String getFilePath(Long inodeId) { - return namesystem.getFilePath(inodeId); - } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java index 7a44dd9..27d9e7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java @@ -35,15 +35,16 @@ import org.apache.hadoop.hdfs.server.namenode.INode; */ @InterfaceAudience.Private public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser - implements FileIdCollector { + implements FileCollector<Long> { private int maxQueueLimitToScan; - private final SPSService service; + private final SPSService <Long> service; private int remainingCapacity = 0; - private List<ItemInfo> currentBatch; + private List<ItemInfo<Long>> currentBatch; - public IntraSPSNameNodeFileIdCollector(FSDirectory dir, SPSService service) { + public IntraSPSNameNodeFileIdCollector(FSDirectory dir, + SPSService<Long> service) { super(dir); this.service = service; this.maxQueueLimitToScan = service.getConf().getInt( @@ -63,7 +64,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser return false; } if (inode.isFile() && inode.asFile().numBlocks() != 0) { - currentBatch.add(new ItemInfo( + currentBatch.add(new ItemInfo<Long>( ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId())); remainingCapacity--; } @@ -83,10 +84,10 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser } @Override - protected void submitCurrentBatch(long startId) + protected void submitCurrentBatch(Long startId) throws IOException, InterruptedException { // Add current child's to queue - service.addAllFileIdsToProcess(startId, + service.addAllFilesToProcess(startId, currentBatch, false); currentBatch.clear(); } @@ -119,7 +120,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser } @Override - public void scanAndCollectFileIds(final Long startINodeId) + public void scanAndCollectFiles(final Long startINodeId) throws IOException, InterruptedException { FSDirectory fsd = getFSDirectory(); INode startInode = fsd.getInode(startINodeId); @@ -129,9 +130,9 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser throttle(); } if (startInode.isFile()) { - currentBatch.add(new ItemInfo(startInode.getId(), startInode.getId())); + currentBatch + .add(new ItemInfo<Long>(startInode.getId(), startInode.getId())); } else { - readLock(); // NOTE: this lock will not be held for full directory scanning. It is // basically a sliced locking. Once it collects a batch size( at max the @@ -148,7 +149,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser } } // Mark startInode traverse is done, this is last-batch - service.addAllFileIdsToProcess(startInode.getId(), currentBatch, true); + service.addAllFilesToProcess(startInode.getId(), currentBatch, true); currentBatch.clear(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java index 47c64cc..bd8ab92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java @@ -21,48 +21,51 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; /** - * ItemInfo is a file info object for which need to satisfy the policy. + * ItemInfo is a file info object for which need to satisfy the policy. For + * internal satisfier service, it uses inode id which is Long datatype. For the + * external satisfier service, it uses the full string representation of the + * path. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class ItemInfo { - private long startId; - private long fileId; +public class ItemInfo<T> { + private T startPath; + private T file; private int retryCount; - public ItemInfo(long startId, long fileId) { - this.startId = startId; - this.fileId = fileId; + public ItemInfo(T startPath, T file) { + this.startPath = startPath; + this.file = file; // set 0 when item is getting added first time in queue. this.retryCount = 0; } - public ItemInfo(final long startId, final long fileId, final int retryCount) { - this.startId = startId; - this.fileId = fileId; + public ItemInfo(final T startPath, final T file, final int retryCount) { + this.startPath = startPath; + this.file = file; this.retryCount = retryCount; } /** - * Return the start inode id of the current track Id. This indicates that SPS - * was invoked on this inode id. + * Returns the start path of the current file. This indicates that SPS + * was invoked on this path. */ - public long getStartId() { - return startId; + public T getStartPath() { + return startPath; } /** - * Return the File inode Id for which needs to satisfy the policy. + * Returns the file for which needs to satisfy the policy. */ - public long getFileId() { - return fileId; + public T getFile() { + return file; } /** * Returns true if the tracking path is a directory, false otherwise. */ public boolean isDir() { - return (startId != fileId); + return !startPath.equals(file); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java index da6e365..71d8fd1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java @@ -27,10 +27,15 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; /** * An interface for SPSService, which exposes life cycle and processing APIs. + * + * @param <T> + * is identifier of inode or full path name of inode. Internal sps will + * use the file inodeId for the block movement. External sps will use + * file string path representation for the block movement. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public interface SPSService { +public interface SPSService<T> { /** * Initializes the helper services. @@ -38,7 +43,7 @@ public interface SPSService { * @param ctxt * - context is an helper service to provide communication channel * between NN and SPS - * @param fileIDCollector + * @param fileCollector * - a helper service for scanning the files under a given directory * id * @param handler @@ -46,7 +51,7 @@ public interface SPSService { * @param blkMovementListener * - listener to know about block movement attempt completion */ - void init(Context ctxt, FileIdCollector fileIDCollector, + void init(Context<T> ctxt, FileCollector<T> fileCollector, BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener); /** @@ -82,23 +87,24 @@ public interface SPSService { boolean isRunning(); /** - * Adds the Item information(file id etc) to processing queue. + * Adds the Item information(file etc) to processing queue. * * @param itemInfo + * file info object for which need to satisfy the policy */ - void addFileIdToProcess(ItemInfo itemInfo, boolean scanCompleted); + void addFileToProcess(ItemInfo<T> itemInfo, boolean scanCompleted); /** - * Adds all the Item information(file id etc) to processing queue. + * Adds all the Item information(file etc) to processing queue. * - * @param startId - * - directory/file id, on which SPS was called. + * @param startPath + * - directory/file, on which SPS was called. * @param itemInfoList * - list of item infos * @param scanCompleted * - whether the scanning of directory fully done with itemInfoList */ - void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList, + void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList, boolean scanCompleted); /** @@ -109,7 +115,7 @@ public interface SPSService { /** * Clear inodeId present in the processing queue. */ - void clearQueue(long inodeId); + void clearQueue(T spsPath); /** * @return the configuration. @@ -119,10 +125,10 @@ public interface SPSService { /** * Marks the scanning of directory if finished. * - * @param inodeId - * - directory inode id. + * @param spsPath + * - satisfier path */ - void markScanCompletedForPath(Long inodeId); + void markScanCompletedForPath(T spsPath); /** * Notify the details of storage movement attempt finished blocks. http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index 6b449aa..08a26e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -66,7 +66,7 @@ import com.google.common.base.Preconditions; * storage policy type in Namespace, but physical block storage movement will * not happen until user runs "Mover Tool" explicitly for such files. The * StoragePolicySatisfier Daemon thread implemented for addressing the case - * where users may want to physically move the blocks by a dedidated daemon (can + * where users may want to physically move the blocks by a dedicated daemon (can * run inside Namenode or stand alone) instead of running mover tool explicitly. * Just calling client API to satisfyStoragePolicy on a file/dir will * automatically trigger to move its physical storage locations as expected in @@ -77,19 +77,19 @@ import com.google.common.base.Preconditions; * physical block movements. */ @InterfaceAudience.Private -public class StoragePolicySatisfier implements SPSService, Runnable { +public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { public static final Logger LOG = LoggerFactory.getLogger(StoragePolicySatisfier.class); private Daemon storagePolicySatisfierThread; - private BlockStorageMovementNeeded storageMovementNeeded; - private BlockStorageMovementAttemptedItems storageMovementsMonitor; + private BlockStorageMovementNeeded<T> storageMovementNeeded; + private BlockStorageMovementAttemptedItems<T> storageMovementsMonitor; private volatile boolean isRunning = false; private volatile StoragePolicySatisfierMode spsMode = StoragePolicySatisfierMode.NONE; private int spsWorkMultiplier; private long blockCount = 0L; private int blockMovementMaxRetry; - private Context ctxt; + private Context<T> ctxt; private BlockMoveTaskHandler blockMoveTaskHandler; private final Configuration conf; @@ -135,15 +135,15 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } } - public void init(final Context context, final FileIdCollector fileIDCollector, + public void init(final Context<T> context, + final FileCollector<T> fileIDCollector, final BlockMoveTaskHandler blockMovementTaskHandler, final BlockMovementListener blockMovementListener) { this.ctxt = context; - this.storageMovementNeeded = - new BlockStorageMovementNeeded(context, fileIDCollector); - this.storageMovementsMonitor = - new BlockStorageMovementAttemptedItems(this, - storageMovementNeeded, blockMovementListener); + this.storageMovementNeeded = new BlockStorageMovementNeeded<T>(context, + fileIDCollector); + this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems<T>( + this, storageMovementNeeded, blockMovementListener); this.blockMoveTaskHandler = blockMovementTaskHandler; this.spsWorkMultiplier = getSPSWorkMultiplier(getConf()); this.blockMovementMaxRetry = getConf().getInt( @@ -257,24 +257,24 @@ public class StoragePolicySatisfier implements SPSService, Runnable { continue; } try { + ItemInfo<T> itemInfo = null; + boolean retryItem = false; if (!ctxt.isInSafeMode()) { - ItemInfo itemInfo = storageMovementNeeded.get(); + itemInfo = storageMovementNeeded.get(); if (itemInfo != null) { if(itemInfo.getRetryCount() >= blockMovementMaxRetry){ LOG.info("Failed to satisfy the policy after " + blockMovementMaxRetry + " retries. Removing inode " - + itemInfo.getFileId() + " from the queue"); + + itemInfo.getFile() + " from the queue"); storageMovementNeeded.removeItemTrackInfo(itemInfo, false); continue; } - long trackId = itemInfo.getFileId(); + T trackId = itemInfo.getFile(); BlocksMovingAnalysis status = null; DatanodeStorageReport[] liveDnReports; BlockStoragePolicy existingStoragePolicy; // TODO: presently, context internally acquire the lock // and returns the result. Need to discuss to move the lock outside? - boolean hasLowRedundancyBlocks = ctxt - .hasLowRedundancyBlocks(trackId); HdfsFileStatus fileStatus = ctxt.getFileInfo(trackId); // Check path existence. if (fileStatus == null || fileStatus.isDir()) { @@ -289,7 +289,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { HdfsLocatedFileStatus file = (HdfsLocatedFileStatus) fileStatus; status = analyseBlocksStorageMovementsAndAssignToDN(file, - hasLowRedundancyBlocks, existingStoragePolicy, liveDnReports); + existingStoragePolicy, liveDnReports); switch (status.status) { // Just add to monitor, so it will be retried after timeout case ANALYSIS_SKIPPED_FOR_RETRY: @@ -302,8 +302,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable { + "movement attempt finished report", status.status, fileStatus.getPath()); } - this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo - .getStartId(), itemInfo.getFileId(), monotonicNow(), + this.storageMovementsMonitor.add(new AttemptedItemInfo<T>( + itemInfo.getStartPath(), itemInfo.getFile(), monotonicNow(), status.assignedBlocks, itemInfo.getRetryCount())); break; case NO_BLOCKS_TARGETS_PAIRED: @@ -312,8 +312,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { + " retry queue as none of the blocks found its eligible" + " targets.", trackId, fileStatus.getPath()); } - itemInfo.increRetryCount(); - this.storageMovementNeeded.add(itemInfo); + retryItem = true; break; case FEW_LOW_REDUNDANCY_BLOCKS: if (LOG.isDebugEnabled()) { @@ -321,8 +320,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { + "retry queue as some of the blocks are low redundant.", trackId, fileStatus.getPath()); } - itemInfo.increRetryCount(); - this.storageMovementNeeded.add(itemInfo); + retryItem = true; break; case BLOCKS_FAILED_TO_MOVE: if (LOG.isDebugEnabled()) { @@ -330,7 +328,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { + "retry queue as some of the blocks movement failed.", trackId, fileStatus.getPath()); } - this.storageMovementNeeded.add(itemInfo); + retryItem = true; break; // Just clean Xattrs case BLOCKS_TARGET_PAIRING_SKIPPED: @@ -354,6 +352,10 @@ public class StoragePolicySatisfier implements SPSService, Runnable { Thread.sleep(3000); blockCount = 0L; } + if (retryItem) { + itemInfo.increRetryCount(); + this.storageMovementNeeded.add(itemInfo); + } } catch (IOException e) { LOG.error("Exception during StoragePolicySatisfier execution - " + "will continue next cycle", e); @@ -377,7 +379,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN( - HdfsLocatedFileStatus fileInfo, boolean hasLowRedundancyBlocks, + HdfsLocatedFileStatus fileInfo, BlockStoragePolicy existingStoragePolicy, DatanodeStorageReport[] liveDns) { BlocksMovingAnalysis.Status status = @@ -403,9 +405,17 @@ public class StoragePolicySatisfier implements SPSService, Runnable { new ArrayList<>()); } List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>(); - + boolean hasLowRedundancyBlocks = false; + int replication = fileInfo.getReplication(); for (int i = 0; i < blocks.size(); i++) { LocatedBlock blockInfo = blocks.get(i); + + // Block is considered as low redundancy when the block locations array + // length is less than expected replication factor. If any of the block is + // low redundant, then hasLowRedundancyBlocks will be marked as true. + hasLowRedundancyBlocks |= isLowRedundancyBlock(blockInfo, replication, + ecPolicy); + List<StorageType> expectedStorageTypes; if (blockInfo.isStriped()) { if (ErasureCodingPolicyManager @@ -446,13 +456,15 @@ public class StoragePolicySatisfier implements SPSService, Runnable { // policy. status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED; } - } else if (hasLowRedundancyBlocks - && status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) { - // Check if the previous block was successfully paired. - status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS; } } + // If there is no block paired and few blocks are low redundant, so marking + // the status as FEW_LOW_REDUNDANCY_BLOCKS. + if (hasLowRedundancyBlocks + && status == BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED) { + status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS; + } List<Block> assignedBlockIds = new ArrayList<Block>(); for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { // Check for at least one block storage movement has been chosen @@ -471,6 +483,33 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } /** + * The given block is considered as low redundancy when the block locations + * length is less than expected replication factor. For EC blocks, redundancy + * is the summation of data + parity blocks. + * + * @param blockInfo + * block + * @param replication + * replication factor of the given file block + * @param ecPolicy + * erasure coding policy of the given file block + * @return true if the given block is low redundant. + */ + private boolean isLowRedundancyBlock(LocatedBlock blockInfo, int replication, + ErasureCodingPolicy ecPolicy) { + boolean hasLowRedundancyBlock = false; + if (blockInfo.isStriped()) { + // For EC blocks, redundancy is the summation of data + parity blocks. + replication = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(); + } + // block is considered as low redundancy when the block locations length is + // less than expected replication factor. + hasLowRedundancyBlock = blockInfo.getLocations().length < replication ? true + : false; + return hasLowRedundancyBlock; + } + + /** * Compute the list of block moving information corresponding to the given * blockId. This will check that each block location of the given block is * satisfying the expected storage policy. If block location is not satisfied @@ -863,7 +902,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } @VisibleForTesting - BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() { + public BlockStorageMovementAttemptedItems<T> getAttemptedItemsMonitor() { return storageMovementsMonitor; } @@ -880,7 +919,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { /** * Clear queues for given track id. */ - public void clearQueue(long trackId) { + public void clearQueue(T trackId) { storageMovementNeeded.clearQueue(trackId); } @@ -889,7 +928,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { * attempted or reported time stamp. This is used by * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}. */ - final static class AttemptedItemInfo extends ItemInfo { + final static class AttemptedItemInfo<T> extends ItemInfo<T> { private long lastAttemptedOrReportedTime; private final List<Block> blocks; @@ -903,7 +942,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { * @param lastAttemptedOrReportedTime * last attempted or reported time */ - AttemptedItemInfo(long rootId, long trackId, + AttemptedItemInfo(T rootId, T trackId, long lastAttemptedOrReportedTime, List<Block> blocks, int retryCount) { super(rootId, trackId, retryCount); @@ -932,24 +971,33 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } + /** + * Returns sps invoked path status. This method is used by internal satisfy + * storage policy service. + * + * @param path + * sps path + * @return storage policy satisfy path status + * @throws IOException + */ public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus( String path) throws IOException { return storageMovementNeeded.getStatus(ctxt.getFileID(path)); } @Override - public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) { + public void addFileToProcess(ItemInfo<T> trackInfo, boolean scanCompleted) { storageMovementNeeded.add(trackInfo, scanCompleted); if (LOG.isDebugEnabled()) { LOG.debug("Added track info for inode {} to block " - + "storageMovementNeeded queue", trackInfo.getFileId()); + + "storageMovementNeeded queue", trackInfo.getFile()); } } @Override - public void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList, + public void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList, boolean scanCompleted) { - getStorageMovementQueue().addAll(startId, itemInfoList, scanCompleted); + getStorageMovementQueue().addAll(startPath, itemInfoList, scanCompleted); } @Override @@ -963,12 +1011,12 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } @VisibleForTesting - public BlockStorageMovementNeeded getStorageMovementQueue() { + public BlockStorageMovementNeeded<T> getStorageMovementQueue() { return storageMovementNeeded; } @Override - public void markScanCompletedForPath(Long inodeId) { + public void markScanCompletedForPath(T inodeId) { getStorageMovementQueue().markScanCompletedForDir(inodeId); } @@ -976,7 +1024,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable { * Join main SPS thread. */ public void join() throws InterruptedException { - //TODO Add join here on SPS rpc server also storagePolicySatisfierThread.join(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java index 5bdf6ae..5ec0372 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java @@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory; public class StoragePolicySatisfyManager { private static final Logger LOG = LoggerFactory .getLogger(StoragePolicySatisfyManager.class); - private final StoragePolicySatisfier spsService; + private final StoragePolicySatisfier<Long> spsService; private final boolean storagePolicyEnabled; private volatile StoragePolicySatisfierMode mode; private final Queue<Long> pathsToBeTraveresed; @@ -84,7 +84,7 @@ public class StoragePolicySatisfyManager { pathsToBeTraveresed = new LinkedList<Long>(); // instantiate SPS service by just keeps config reference and not starting // any supporting threads. - spsService = new StoragePolicySatisfier(conf); + spsService = new StoragePolicySatisfier<Long>(conf); this.namesystem = namesystem; this.blkMgr = blkMgr; } @@ -309,7 +309,7 @@ public class StoragePolicySatisfyManager { /** * @return internal SPS service instance. */ - public SPSService getInternalSPSService() { + public SPSService<Long> getInternalSPSService() { return this.spsService; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java index 9f5cadd..615e297 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java @@ -206,21 +206,11 @@ public interface NamenodeProtocol { boolean isRollingUpgrade() throws IOException; /** - * Gets the file path for the given file id. This API used by External SPS. - * - * @param inodeId - * - file inode id. - * @return path - */ - @Idempotent - String getFilePath(Long inodeId) throws IOException; - - /** - * @return Gets the next available sps path id, otherwise null. This API used + * @return Gets the next available sps path, otherwise null. This API used * by External SPS. */ @AtMostOnce - Long getNextSPSPathId() throws IOException; + String getNextSPSPath() throws IOException; /** * Verifies whether the given Datanode has the enough estimated size with @@ -236,15 +226,5 @@ public interface NamenodeProtocol { @Idempotent boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type, long estimatedSize) throws IOException; - - /** - * Check if any low redundancy blocks for given file id. This API used by - * External SPS. - * - * @param inodeID - * - inode id. - */ - @Idempotent - boolean hasLowRedundancyBlocks(long inodeID) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java index 4a762649..7580ba9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java @@ -81,11 +81,11 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler { private final SaslDataTransferClient saslClient; private final BlockStorageMovementTracker blkMovementTracker; private Daemon movementTrackerThread; - private final SPSService service; + private final SPSService<String> service; private final BlockDispatcher blkDispatcher; public ExternalSPSBlockMoveTaskHandler(Configuration conf, - NameNodeConnector nnc, SPSService spsService) { + NameNodeConnector nnc, SPSService<String> spsService) { int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); moveExecutor = initializeBlockMoverThreadPool(moverThreads); http://git-wip-us.apache.org/repos/asf/hadoop/blob/79925028/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java index c309209..5d0aee6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.sps; +import java.io.FileNotFoundException; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; @@ -30,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -46,15 +48,15 @@ import org.slf4j.LoggerFactory; * SPS from Namenode state. */ @InterfaceAudience.Private -public class ExternalSPSContext implements Context { +public class ExternalSPSContext implements Context<String> { public static final Logger LOG = LoggerFactory.getLogger(ExternalSPSContext.class); - private SPSService service; + private SPSService<String> service; private NameNodeConnector nnc = null; private BlockStoragePolicySuite createDefaultSuite = BlockStoragePolicySuite.createDefaultSuite(); - public ExternalSPSContext(SPSService service, NameNodeConnector nnc) { + public ExternalSPSContext(SPSService<String> service, NameNodeConnector nnc) { this.service = service; this.nnc = nnc; } @@ -110,14 +112,12 @@ public class ExternalSPSContext implements Context { } @Override - public boolean isFileExist(long inodeId) { - String filePath = null; + public boolean isFileExist(String filePath) { try { - filePath = getFilePath(inodeId); return nnc.getDistributedFileSystem().exists(new Path(filePath)); } catch (IllegalArgumentException | IOException e) { - LOG.warn("Exception while getting file is for the given path:{} " - + "and fileId:{}", filePath, inodeId, e); + LOG.warn("Exception while getting file is for the given path:{}", + filePath, e); } return false; } @@ -133,8 +133,8 @@ public class ExternalSPSContext implements Context { } @Override - public void removeSPSHint(long inodeId) throws IOException { - nnc.getDistributedFileSystem().removeXAttr(new Path(getFilePath(inodeId)), + public void removeSPSHint(String inodeId) throws IOException { + nnc.getDistributedFileSystem().removeXAttr(new Path(inodeId), HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY); } @@ -150,9 +150,15 @@ public class ExternalSPSContext implements Context { } @Override - public HdfsFileStatus getFileInfo(long inodeID) throws IOException { - return nnc.getDistributedFileSystem().getClient() - .getLocatedFileInfo(getFilePath(inodeID), false); + public HdfsFileStatus getFileInfo(String path) throws IOException { + HdfsLocatedFileStatus fileInfo = null; + try { + fileInfo = nnc.getDistributedFileSystem().getClient() + .getLocatedFileInfo(path, false); + } catch (FileNotFoundException e) { + LOG.debug("Path:{} doesn't exists!", path, e); + } + return fileInfo; } @Override @@ -162,17 +168,6 @@ public class ExternalSPSContext implements Context { } @Override - public boolean hasLowRedundancyBlocks(long inodeID) { - try { - return nnc.getNNProtocolConnection().hasLowRedundancyBlocks(inodeID); - } catch (IOException e) { - LOG.warn("Failed to check whether fileid:{} has low redundancy blocks.", - inodeID, e); - return false; - } - } - - @Override public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type, long estimatedSize) { // TODO: Instead of calling namenode for checking the available space, it @@ -190,9 +185,9 @@ public class ExternalSPSContext implements Context { } @Override - public Long getNextSPSPathId() { + public String getNextSPSPath() { try { - return nnc.getNNProtocolConnection().getNextSPSPathId(); + return nnc.getNNProtocolConnection().getNextSPSPath(); } catch (IOException e) { LOG.warn("Exception while getting next sps path id from Namenode.", e); return null; @@ -200,7 +195,7 @@ public class ExternalSPSContext implements Context { } @Override - public void removeSPSPathId(long pathId) { + public void removeSPSPathId(String pathId) { // We need not specifically implement for external. } @@ -208,15 +203,4 @@ public class ExternalSPSContext implements Context { public void removeAllSPSPathIds() { // We need not specifically implement for external. } - - @Override - public String getFilePath(Long inodeId) { - try { - return nnc.getNNProtocolConnection().getFilePath(inodeId); - } catch (IOException e) { - LOG.warn("Exception while getting file path id:{} from Namenode.", - inodeId, e); - return null; - } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org