HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos but not StorageIDs. (Contributed by Milan Desai)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ff900eb6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ff900eb6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ff900eb6 Branch: refs/heads/branch-2 Commit: ff900eb64a59e216af6073c769f9acdff8d4f812 Parents: b1aad1d Author: Arpit Agarwal <a...@apache.org> Authored: Mon Feb 9 12:17:40 2015 -0800 Committer: Arpit Agarwal <a...@apache.org> Committed: Mon Feb 9 12:17:53 2015 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/net/NetworkTopology.java | 2 +- .../net/NetworkTopologyWithNodeGroup.java | 2 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/protocol/LocatedBlock.java | 77 ++++++++++++++---- .../server/blockmanagement/DatanodeManager.java | 2 + .../protocol/DatanodeInfoWithStorage.java | 59 ++++++++++++++ .../apache/hadoop/hdfs/TestDecommission.java | 10 ++- .../blockmanagement/TestDatanodeManager.java | 84 ++++++++++++++++++++ 8 files changed, 218 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index a11ba9c..02b0005 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -860,7 +860,7 @@ public class NetworkTopology { // Start off by initializing to off rack int weight = 2; if (reader != null) { - if (reader == node) { + if (reader.equals(node)) { weight = 0; } else if (isOnSameRack(reader, node)) { weight = 1; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java index 13160eb..3de49dc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java @@ -254,7 +254,7 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology { // Start off by initializing to off rack int weight = 3; if (reader != null) { - if (reader == node) { + if (reader.equals(node)) { weight = 0; } else if (isOnSameNodeGroup(reader, node)) { weight = 1; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 86a43e7..3278dd8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -584,6 +584,9 @@ Release 2.7.0 - UNRELEASED HDFS-7741. Remove unnecessary synchronized in FSDataInputStream and HdfsDataInputStream. (yliu) + HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos + but not StorageIDs. (Milan Desai via Arpit Agarwal) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java index 30368f6..7fb2e30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.security.token.Token; import com.google.common.collect.Lists; @@ -41,11 +42,13 @@ public class LocatedBlock { private final ExtendedBlock b; private long offset; // offset of the first byte of the block in the file - private final DatanodeInfo[] locs; - /** Storage ID for each replica */ - private final String[] storageIDs; - // Storage type for each replica, if reported. - private final StorageType[] storageTypes; + private final DatanodeInfoWithStorage[] locs; + private final boolean hasStorageIDs; + private final boolean hasStorageTypes; + /** Cached storage ID for each replica */ + private String[] storageIDs; + /** Cached storage type for each replica, if reported. */ + private StorageType[] storageTypes; // corrupt flag is true if all of the replicas of a block are corrupt. // else false. If block has few corrupt replicas, they are filtered and // their locations are not part of this object @@ -57,7 +60,8 @@ public class LocatedBlock { private DatanodeInfo[] cachedLocs; // Used when there are no locations - private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0]; + private static final DatanodeInfoWithStorage[] EMPTY_LOCS = + new DatanodeInfoWithStorage[0]; public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) { this(b, locs, -1, false); // startOffset is unknown @@ -94,10 +98,22 @@ public class LocatedBlock { if (locs==null) { this.locs = EMPTY_LOCS; } else { - this.locs = locs; + this.locs = new DatanodeInfoWithStorage[locs.length]; + for(int i = 0; i < locs.length; i++) { + DatanodeInfo di = locs[i]; + DatanodeInfoWithStorage storage = new DatanodeInfoWithStorage(di, + storageIDs != null ? storageIDs[i] : null, + storageTypes != null ? storageTypes[i] : null); + storage.setDependentHostNames(di.getDependentHostNames()); + storage.setLevel(di.getLevel()); + storage.setParent(di.getParent()); + this.locs[i] = storage; + } } this.storageIDs = storageIDs; this.storageTypes = storageTypes; + this.hasStorageIDs = storageIDs != null; + this.hasStorageTypes = storageTypes != null; if (cachedLocs == null || cachedLocs.length == 0) { this.cachedLocs = EMPTY_LOCS; @@ -118,18 +134,53 @@ public class LocatedBlock { return b; } - public DatanodeInfo[] getLocations() { + /** + * Returns the locations associated with this block. The returned array is not + * expected to be modified. If it is, caller must immediately invoke + * {@link org.apache.hadoop.hdfs.protocol.LocatedBlock#invalidateCachedStorageInfo} + * to invalidate the cached Storage ID/Type arrays. + */ + public DatanodeInfoWithStorage[] getLocations() { return locs; } public StorageType[] getStorageTypes() { + if(!hasStorageTypes) { + return null; + } + if(storageTypes != null) { + return storageTypes; + } + storageTypes = new StorageType[locs.length]; + for(int i = 0; i < locs.length; i++) { + storageTypes[i] = locs[i].getStorageType(); + } return storageTypes; } public String[] getStorageIDs() { + if(!hasStorageIDs) { + return null; + } + if(storageIDs != null) { + return storageIDs; + } + storageIDs = new String[locs.length]; + for(int i = 0; i < locs.length; i++) { + storageIDs[i] = locs[i].getStorageID(); + } return storageIDs; } + /** + * Invalidates the cached StorageID and StorageType information. Must be + * called when the locations array is modified. + */ + public void invalidateCachedStorageInfo() { + storageIDs = null; + storageTypes = null; + } + public long getStartOffset() { return offset; } @@ -161,9 +212,9 @@ public class LocatedBlock { return; } // Try to re-use a DatanodeInfo already in loc - for (int i=0; i<locs.length; i++) { - if (locs[i].equals(loc)) { - cachedList.add(locs[i]); + for (DatanodeInfoWithStorage di : locs) { + if (loc.equals(di)) { + cachedList.add(di); cachedLocs = cachedList.toArray(cachedLocs); return; } @@ -187,10 +238,6 @@ public class LocatedBlock { + "; corrupt=" + corrupt + "; offset=" + offset + "; locs=" + Arrays.asList(locs) - + "; storageIDs=" + - (storageIDs != null ? Arrays.asList(storageIDs) : null) - + "; storageTypes=" + - (storageTypes != null ? Arrays.asList(storageTypes) : null) + "}"; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index a31123c..d251c72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -391,6 +391,8 @@ public class DatanodeManager { } int activeLen = lastActiveIndex + 1; networktopology.sortByDistance(client, b.getLocations(), activeLen); + // must invalidate cache since we modified locations array + b.invalidateCachedStorageInfo(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java new file mode 100644 index 0000000..ec8c346 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + +public class DatanodeInfoWithStorage extends DatanodeInfo { + private final String storageID; + private final StorageType storageType; + + public DatanodeInfoWithStorage(DatanodeInfo from, String storageID, + StorageType storageType) { + super(from); + this.storageID = storageID; + this.storageType = storageType; + } + + public String getStorageID() { + return storageID; + } + + public StorageType getStorageType() { + return storageType; + } + + @Override + public boolean equals(Object o) { + // allows this class to be used interchangeably with DatanodeInfo + return super.equals(o); + } + + @Override + public int hashCode() { + // allows this class to be used interchangeably with DatanodeInfo + return super.hashCode(); + } + + @Override + public String toString() { + return "DatanodeInfoWithStorage[" + super.toString() + "," + storageID + + "," + storageType + "]"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index 20c3c01..0940590 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; @@ -800,22 +801,23 @@ public class TestDecommission { ArrayList<String> nodes = new ArrayList<String>(); ArrayList<DatanodeInfo> dnInfos = new ArrayList<DatanodeInfo>(); - + + DatanodeManager dm = ns.getBlockManager().getDatanodeManager(); for (DatanodeInfo datanodeInfo : dnInfos4FirstBlock) { DatanodeInfo found = datanodeInfo; for (DatanodeInfo dif: dnInfos4LastBlock) { if (datanodeInfo.equals(dif)) { - found = null; + found = null; } } if (found != null) { nodes.add(found.getXferAddr()); - dnInfos.add(found); + dnInfos.add(dm.getDatanode(found)); } } //decommission one of the 3 nodes which have last block nodes.add(dnInfos4LastBlock[0].getXferAddr()); - dnInfos.add(dnInfos4LastBlock[0]); + dnInfos.add(dm.getDatanode(dnInfos4LastBlock[0])); writeConfigFile(excludeFile, nodes); refreshNodes(ns, conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index 2c65fff..adf31a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -31,13 +32,19 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.net.DNSToSwitchMapping; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.*; public class TestDatanodeManager { @@ -210,4 +217,81 @@ public class TestDatanodeManager { public void reloadCachedMappings(List<String> names) { } } + + /** + * This test creates a LocatedBlock with 5 locations, sorts the locations + * based on the network topology, and ensures the locations are still aligned + * with the storage ids and storage types. + */ + @Test + public void testSortLocatedBlocks() throws IOException { + // create the DatanodeManager which will be tested + FSNamesystem fsn = Mockito.mock(FSNamesystem.class); + Mockito.when(fsn.hasWriteLock()).thenReturn(true); + DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class), + fsn, new Configuration()); + + // register 5 datanodes, each with different storage ID and type + DatanodeInfo[] locs = new DatanodeInfo[5]; + String[] storageIDs = new String[5]; + StorageType[] storageTypes = new StorageType[]{ + StorageType.ARCHIVE, + StorageType.DEFAULT, + StorageType.DISK, + StorageType.RAM_DISK, + StorageType.SSD + }; + for(int i = 0; i < 5; i++) { + // register new datanode + String uuid = "UUID-"+i; + String ip = "IP-" + i; + DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class); + Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid); + Mockito.when(dr.getIpAddr()).thenReturn(ip); + Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000"); + Mockito.when(dr.getXferPort()).thenReturn(9000); + Mockito.when(dr.getSoftwareVersion()).thenReturn("version1"); + dm.registerDatanode(dr); + + // get location and storage information + locs[i] = dm.getDatanode(uuid); + storageIDs[i] = "storageID-"+i; + } + + // set first 2 locations as decomissioned + locs[0].setDecommissioned(); + locs[1].setDecommissioned(); + + // create LocatedBlock with above locations + ExtendedBlock b = new ExtendedBlock("somePoolID", 1234); + LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes); + List<LocatedBlock> blocks = new ArrayList<>(); + blocks.add(block); + + final String targetIp = locs[4].getIpAddr(); + + // sort block locations + dm.sortLocatedBlocks(targetIp, blocks); + + // check that storage IDs/types are aligned with datanode locs + DatanodeInfoWithStorage[] sortedLocs = block.getLocations(); + storageIDs = block.getStorageIDs(); + storageTypes = block.getStorageTypes(); + assertThat(sortedLocs.length, is(5)); + assertThat(storageIDs.length, is(5)); + assertThat(storageTypes.length, is(5)); + for(int i = 0; i < sortedLocs.length; i++) { + assertThat(sortedLocs[i].getStorageID(), is(storageIDs[i])); + assertThat(sortedLocs[i].getStorageType(), is(storageTypes[i])); + } + + // Ensure the local node is first. + assertThat(sortedLocs[0].getIpAddr(), is(targetIp)); + + // Ensure the two decommissioned DNs were moved to the end. + assertThat(sortedLocs[sortedLocs.length-1].getAdminState(), + is(DatanodeInfo.AdminStates.DECOMMISSIONED)); + assertThat(sortedLocs[sortedLocs.length-2].getAdminState(), + is(DatanodeInfo.AdminStates.DECOMMISSIONED)); + } }