http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java new file mode 100644 index 0000000..3915c56 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java @@ -0,0 +1,419 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import static org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver.extractTempFileName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.junit.Before; +import org.junit.Test; + +/** + * Test the multiple destination resolver. + */ +public class TestMultipleDestinationResolver { + + private MultipleDestinationMountTableResolver resolver; + + @Before + public void setup() throws IOException { + Configuration conf = new Configuration(); + resolver = new MultipleDestinationMountTableResolver(conf, null); + + // We manually point /tmp to only subcluster0 + Map<String, String> map1 = new HashMap<>(); + map1.put("subcluster0", "/tmp"); + resolver.addEntry(MountTable.newInstance("/tmp", map1)); + + // We manually point / to subcluster0,1,2 with default order (hash) + Map<String, String> mapDefault = new HashMap<>(); + mapDefault.put("subcluster0", "/"); + mapDefault.put("subcluster1", "/"); + mapDefault.put("subcluster2", "/"); + MountTable defaultEntry = MountTable.newInstance("/", mapDefault); + resolver.addEntry(defaultEntry); + + // We manually point /hash to subcluster0,1,2 with hashing + Map<String, String> mapHash = new HashMap<>(); + mapHash.put("subcluster0", "/hash"); + mapHash.put("subcluster1", "/hash"); + mapHash.put("subcluster2", "/hash"); + MountTable hashEntry = MountTable.newInstance("/hash", mapHash); + hashEntry.setDestOrder(DestinationOrder.HASH); + resolver.addEntry(hashEntry); + + // We manually point /hashall to subcluster0,1,2 with hashing (full tree) + Map<String, String> mapHashAll = new HashMap<>(); + mapHashAll.put("subcluster0", "/hashall"); + mapHashAll.put("subcluster1", "/hashall"); + mapHashAll.put("subcluster2", "/hashall"); + MountTable hashEntryAll = MountTable.newInstance("/hashall", mapHashAll); + hashEntryAll.setDestOrder(DestinationOrder.HASH_ALL); + resolver.addEntry(hashEntryAll); + + // We point /local to subclusters 0, 1, 2 with the local order + Map<String, String> mapLocal = new HashMap<>(); + mapLocal.put("subcluster0", "/local"); + mapLocal.put("subcluster1", "/local"); + mapLocal.put("subcluster2", "/local"); + MountTable localEntry = MountTable.newInstance("/local", mapLocal); + localEntry.setDestOrder(DestinationOrder.LOCAL); + resolver.addEntry(localEntry); + + // We point /random to subclusters 0, 1, 2 with the random order + Map<String, String> mapRandom = new HashMap<>(); + mapRandom.put("subcluster0", "/random"); + mapRandom.put("subcluster1", "/random"); + mapRandom.put("subcluster2", "/random"); + MountTable randomEntry = MountTable.newInstance("/random", mapRandom); + randomEntry.setDestOrder(DestinationOrder.RANDOM); + resolver.addEntry(randomEntry); + + // Read only mount point + Map<String, String> mapReadOnly = new HashMap<>(); + mapReadOnly.put("subcluster0", "/readonly"); + mapReadOnly.put("subcluster1", "/readonly"); + mapReadOnly.put("subcluster2", "/readonly"); + MountTable readOnlyEntry = MountTable.newInstance("/readonly", mapReadOnly); + readOnlyEntry.setReadOnly(true); + resolver.addEntry(readOnlyEntry); + } + + @Test + public void testHashEqualDistribution() throws IOException { + // First level + testEvenDistribution("/hash"); + testEvenDistribution("/hash/folder0", false); + + // All levels + testEvenDistribution("/hashall"); + testEvenDistribution("/hashall/folder0"); + } + + @Test + public void testHashAll() throws IOException { + // Files should be spread across subclusters + PathLocation dest0 = resolver.getDestinationForPath("/hashall/file0.txt"); + assertDest("subcluster0", dest0); + PathLocation dest1 = resolver.getDestinationForPath("/hashall/file1.txt"); + assertDest("subcluster1", dest1); + + // Files within folder should be spread across subclusters + PathLocation dest2 = resolver.getDestinationForPath("/hashall/folder0"); + assertDest("subcluster2", dest2); + PathLocation dest3 = resolver.getDestinationForPath( + "/hashall/folder0/file0.txt"); + assertDest("subcluster1", dest3); + PathLocation dest4 = resolver.getDestinationForPath( + "/hashall/folder0/file1.txt"); + assertDest("subcluster0", dest4); + + PathLocation dest5 = resolver.getDestinationForPath( + "/hashall/folder0/folder0/file0.txt"); + assertDest("subcluster1", dest5); + PathLocation dest6 = resolver.getDestinationForPath( + "/hashall/folder0/folder0/file1.txt"); + assertDest("subcluster1", dest6); + PathLocation dest7 = resolver.getDestinationForPath( + "/hashall/folder0/folder0/file2.txt"); + assertDest("subcluster0", dest7); + + PathLocation dest8 = resolver.getDestinationForPath("/hashall/folder1"); + assertDest("subcluster1", dest8); + PathLocation dest9 = resolver.getDestinationForPath( + "/hashall/folder1/file0.txt"); + assertDest("subcluster0", dest9); + PathLocation dest10 = resolver.getDestinationForPath( + "/hashall/folder1/file1.txt"); + assertDest("subcluster1", dest10); + + PathLocation dest11 = resolver.getDestinationForPath("/hashall/folder2"); + assertDest("subcluster2", dest11); + PathLocation dest12 = resolver.getDestinationForPath( + "/hashall/folder2/file0.txt"); + assertDest("subcluster0", dest12); + PathLocation dest13 = resolver.getDestinationForPath( + "/hashall/folder2/file1.txt"); + assertDest("subcluster0", dest13); + PathLocation dest14 = resolver.getDestinationForPath( + "/hashall/folder2/file2.txt"); + assertDest("subcluster1", dest14); + } + + @Test + public void testHashFirst() throws IOException { + PathLocation dest0 = resolver.getDestinationForPath("/hashall/file0.txt"); + assertDest("subcluster0", dest0); + PathLocation dest1 = resolver.getDestinationForPath("/hashall/file1.txt"); + assertDest("subcluster1", dest1); + + // All these must be in the same location: subcluster0 + PathLocation dest2 = resolver.getDestinationForPath("/hash/folder0"); + assertDest("subcluster0", dest2); + PathLocation dest3 = resolver.getDestinationForPath( + "/hash/folder0/file0.txt"); + assertDest("subcluster0", dest3); + PathLocation dest4 = resolver.getDestinationForPath( + "/hash/folder0/file1.txt"); + assertDest("subcluster0", dest4); + + PathLocation dest5 = resolver.getDestinationForPath( + "/hash/folder0/folder0/file0.txt"); + assertDest("subcluster0", dest5); + PathLocation dest6 = resolver.getDestinationForPath( + "/hash/folder0/folder0/file1.txt"); + assertDest("subcluster0", dest6); + + // All these must be in the same location: subcluster2 + PathLocation dest7 = resolver.getDestinationForPath("/hash/folder1"); + assertDest("subcluster2", dest7); + PathLocation dest8 = resolver.getDestinationForPath( + "/hash/folder1/file0.txt"); + assertDest("subcluster2", dest8); + PathLocation dest9 = resolver.getDestinationForPath( + "/hash/folder1/file1.txt"); + assertDest("subcluster2", dest9); + + // All these must be in the same location: subcluster2 + PathLocation dest10 = resolver.getDestinationForPath("/hash/folder2"); + assertDest("subcluster2", dest10); + PathLocation dest11 = resolver.getDestinationForPath( + "/hash/folder2/file0.txt"); + assertDest("subcluster2", dest11); + PathLocation dest12 = resolver.getDestinationForPath( + "/hash/folder2/file1.txt"); + assertDest("subcluster2", dest12); + } + + @Test + public void testRandomEqualDistribution() throws IOException { + testEvenDistribution("/random"); + } + + @Test + public void testSingleDestination() throws IOException { + // All the files in /tmp should be in subcluster0 + for (int f = 0; f < 100; f++) { + String filename = "/tmp/b/c/file" + f + ".txt"; + PathLocation destination = resolver.getDestinationForPath(filename); + RemoteLocation loc = destination.getDefaultLocation(); + assertEquals("subcluster0", loc.getNameserviceId()); + assertEquals(filename, loc.getDest()); + } + } + + @Test + public void testResolveSubdirectories() throws Exception { + // Simulate a testdir under a multi-destination mount. + Random r = new Random(); + String testDir = "/sort/testdir" + r.nextInt(); + String file1 = testDir + "/file1" + r.nextInt(); + String file2 = testDir + "/file2" + r.nextInt(); + + // Verify both files resolve to the same namespace as the parent dir. + PathLocation testDirLocation = resolver.getDestinationForPath(testDir); + RemoteLocation defaultLoc = testDirLocation.getDefaultLocation(); + String testDirNamespace = defaultLoc.getNameserviceId(); + + PathLocation file1Location = resolver.getDestinationForPath(file1); + RemoteLocation defaultLoc1 = file1Location.getDefaultLocation(); + assertEquals(testDirNamespace, defaultLoc1.getNameserviceId()); + + PathLocation file2Location = resolver.getDestinationForPath(file2); + RemoteLocation defaultLoc2 = file2Location.getDefaultLocation(); + assertEquals(testDirNamespace, defaultLoc2.getNameserviceId()); + } + + @Test + public void testExtractTempFileName() { + for (String teststring : new String[] { + "testfile1.txt.COPYING", + "testfile1.txt._COPYING_", + "testfile1.txt._COPYING_.attempt_1486662804109_0055_m_000042_0", + "testfile1.txt.tmp", + "_temp/testfile1.txt", + "_temporary/testfile1.txt.af77e2ab-4bc5-4959-ae08-299c880ee6b8", + "_temporary/0/_temporary/attempt_201706281636_0007_m_000003_46/" + + "testfile1.txt" }) { + String finalName = extractTempFileName(teststring); + assertEquals("testfile1.txt", finalName); + } + + // False cases + assertEquals( + "file1.txt.COPYING1", extractTempFileName("file1.txt.COPYING1")); + assertEquals("file1.txt.tmp2", extractTempFileName("file1.txt.tmp2")); + + // Speculation patterns + String finalName = extractTempFileName( + "_temporary/part-00007.af77e2ab-4bc5-4959-ae08-299c880ee6b8"); + assertEquals("part-00007", finalName); + finalName = extractTempFileName( + "_temporary/0/_temporary/attempt_201706281636_0007_m_000003_46/" + + "part-00003"); + assertEquals("part-00003", finalName); + + // Subfolders + finalName = extractTempFileName("folder0/testfile1.txt._COPYING_"); + assertEquals("folder0/testfile1.txt", finalName); + finalName = extractTempFileName( + "folder0/folder1/testfile1.txt._COPYING_"); + assertEquals("folder0/folder1/testfile1.txt", finalName); + finalName = extractTempFileName( + "processedHrsData.txt/_temporary/0/_temporary/" + + "attempt_201706281636_0007_m_000003_46/part-00003"); + assertEquals("processedHrsData.txt/part-00003", finalName); + } + + @Test + public void testReadOnly() throws IOException { + MountTable mount = resolver.getMountPoint("/readonly"); + assertTrue(mount.isReadOnly()); + + PathLocation dest0 = resolver.getDestinationForPath("/readonly/file0.txt"); + assertDest("subcluster1", dest0); + PathLocation dest1 = resolver.getDestinationForPath("/readonly/file1.txt"); + assertDest("subcluster2", dest1); + + // All these must be in the same location: subcluster0 + PathLocation dest2 = resolver.getDestinationForPath("/readonly/folder0"); + assertDest("subcluster1", dest2); + PathLocation dest3 = resolver.getDestinationForPath( + "/readonly/folder0/file0.txt"); + assertDest("subcluster1", dest3); + PathLocation dest4 = resolver.getDestinationForPath( + "/readonly/folder0/file1.txt"); + assertDest("subcluster1", dest4); + + PathLocation dest5 = resolver.getDestinationForPath( + "/readonly/folder0/folder0/file0.txt"); + assertDest("subcluster1", dest5); + PathLocation dest6 = resolver.getDestinationForPath( + "/readonly/folder0/folder0/file1.txt"); + assertDest("subcluster1", dest6); + + // All these must be in the same location: subcluster2 + PathLocation dest7 = resolver.getDestinationForPath("/readonly/folder1"); + assertDest("subcluster2", dest7); + PathLocation dest8 = resolver.getDestinationForPath( + "/readonly/folder1/file0.txt"); + assertDest("subcluster2", dest8); + PathLocation dest9 = resolver.getDestinationForPath( + "/readonly/folder1/file1.txt"); + assertDest("subcluster2", dest9); + + // All these must be in the same location: subcluster2 + PathLocation dest10 = resolver.getDestinationForPath("/readonly/folder2"); + assertDest("subcluster1", dest10); + PathLocation dest11 = resolver.getDestinationForPath( + "/readonly/folder2/file0.txt"); + assertDest("subcluster1", dest11); + PathLocation dest12 = resolver.getDestinationForPath( + "/readonly/folder2/file1.txt"); + assertDest("subcluster1", dest12); + } + + @Test + public void testLocalResolver() throws IOException { + PathLocation dest0 = + resolver.getDestinationForPath("/local/folder0/file0.txt"); + assertDest("subcluster0", dest0); + } + + @Test + public void testRandomResolver() throws IOException { + Set<String> destinations = new HashSet<>(); + for (int i = 0; i < 30; i++) { + PathLocation dest = + resolver.getDestinationForPath("/random/folder0/file0.txt"); + RemoteLocation firstDest = dest.getDestinations().get(0); + String nsId = firstDest.getNameserviceId(); + destinations.add(nsId); + } + assertEquals(3, destinations.size()); + } + + /** + * Test that a path has files distributed across destinations evenly. + * @param path Path to check. + * @throws IOException + */ + private void testEvenDistribution(final String path) throws IOException { + testEvenDistribution(path, true); + } + + /** + * Test that a path has files distributed across destinations evenly or not. + * @param path Path to check. + * @param even If the distribution should be even or not. + * @throws IOException If it cannot check it. + */ + private void testEvenDistribution(final String path, final boolean even) + throws IOException { + + // Subcluster -> Files + Map<String, Set<String>> results = new HashMap<>(); + for (int f = 0; f < 10000; f++) { + String filename = path + "/file" + f + ".txt"; + PathLocation destination = resolver.getDestinationForPath(filename); + RemoteLocation loc = destination.getDefaultLocation(); + assertEquals(filename, loc.getDest()); + + String nsId = loc.getNameserviceId(); + if (!results.containsKey(nsId)) { + results.put(nsId, new TreeSet<>()); + } + results.get(nsId).add(filename); + } + + if (!even) { + // All files should be in one subcluster + assertEquals(1, results.size()); + } else { + // Files should be distributed somewhat evenly + assertEquals(3, results.size()); + int count = 0; + for (Set<String> files : results.values()) { + count = count + files.size(); + } + int avg = count / results.keySet().size(); + for (Set<String> files : results.values()) { + int filesCount = files.size(); + // Check that the count in each namespace is within 20% of avg + assertTrue(filesCount > 0); + assertTrue(Math.abs(filesCount - avg) < (avg / 5)); + } + } + } + + private static void assertDest(String expectedDest, PathLocation loc) { + assertEquals(expectedDest, loc.getDestinations().get(0).getNameserviceId()); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java new file mode 100644 index 0000000..00c2c13 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.ROUTERS; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.newStateStore; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the basic {@link ActiveNamenodeResolver} functionality. + */ +public class TestNamenodeResolver { + + private static StateStoreService stateStore; + private static ActiveNamenodeResolver namenodeResolver; + + @BeforeClass + public static void create() throws Exception { + + Configuration conf = getStateStoreConfiguration(); + + // Reduce expirations to 5 seconds + conf.setLong( + RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, + TimeUnit.SECONDS.toMillis(5)); + + stateStore = newStateStore(conf); + assertNotNull(stateStore); + + namenodeResolver = new MembershipNamenodeResolver(conf, stateStore); + namenodeResolver.setRouterId(ROUTERS[0]); + } + + @AfterClass + public static void destroy() throws Exception { + stateStore.stop(); + stateStore.close(); + } + + @Before + public void setup() throws IOException, InterruptedException { + // Wait for state store to connect + stateStore.loadDriver(); + waitStateStore(stateStore, 10000); + + // Clear NN registrations + boolean cleared = clearRecords(stateStore, MembershipState.class); + assertTrue(cleared); + } + + @Test + public void testStateStoreDisconnected() throws Exception { + + // Add an entry to the store + NamenodeStatusReport report = createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE); + assertTrue(namenodeResolver.registerNamenode(report)); + + // Close the data store driver + stateStore.closeDriver(); + assertFalse(stateStore.isDriverReady()); + + // Flush the caches + stateStore.refreshCaches(true); + + // Verify commands fail due to no cached data and no state store + // connectivity. + List<? extends FederationNamenodeContext> nns = + namenodeResolver.getNamenodesForBlockPoolId(NAMESERVICES[0]); + assertNull(nns); + + verifyException(namenodeResolver, "registerNamenode", + StateStoreUnavailableException.class, + new Class[] {NamenodeStatusReport.class}, new Object[] {report}); + } + + /** + * Verify the first registration on the resolver. + * + * @param nsId Nameservice identifier. + * @param nnId Namenode identifier within the nemeservice. + * @param resultsCount Number of results expected. + * @param state Expected state for the first one. + * @throws IOException If we cannot get the namenodes. + */ + private void verifyFirstRegistration(String nsId, String nnId, + int resultsCount, FederationNamenodeServiceState state) + throws IOException { + List<? extends FederationNamenodeContext> namenodes = + namenodeResolver.getNamenodesForNameserviceId(nsId); + if (resultsCount == 0) { + assertNull(namenodes); + } else { + assertEquals(resultsCount, namenodes.size()); + if (namenodes.size() > 0) { + FederationNamenodeContext namenode = namenodes.get(0); + assertEquals(state, namenode.getState()); + assertEquals(nnId, namenode.getNamenodeId()); + } + } + } + + @Test + public void testRegistrationExpired() + throws InterruptedException, IOException { + + // Populate the state store with a single NN element + // 1) ns0:nn0 - Active + // Wait for the entry to expire without heartbeating + // Verify the NN entry is not accessible once expired. + NamenodeStatusReport report = createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE); + assertTrue(namenodeResolver.registerNamenode(report)); + + // Load cache + stateStore.refreshCaches(true); + + // Verify + verifyFirstRegistration( + NAMESERVICES[0], NAMENODES[0], 1, + FederationNamenodeServiceState.ACTIVE); + + // Wait past expiration (set in conf to 5 seconds) + Thread.sleep(6000); + // Reload cache + stateStore.refreshCaches(true); + + // Verify entry is now expired and is no longer in the cache + verifyFirstRegistration( + NAMESERVICES[0], NAMENODES[0], 0, + FederationNamenodeServiceState.ACTIVE); + + // Heartbeat again, updates dateModified + assertTrue(namenodeResolver.registerNamenode(report)); + // Reload cache + stateStore.refreshCaches(true); + + // Verify updated entry is marked active again and accessible to RPC server + verifyFirstRegistration( + NAMESERVICES[0], NAMENODES[0], 1, + FederationNamenodeServiceState.ACTIVE); + } + + @Test + public void testRegistrationNamenodeSelection() + throws InterruptedException, IOException { + + // 1) ns0:nn0 - Active + // 2) ns0:nn1 - Standby (newest) + // Verify the selected entry is the active entry + assertTrue(namenodeResolver.registerNamenode( + createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE))); + Thread.sleep(100); + assertTrue(namenodeResolver.registerNamenode( + createNamenodeReport( + NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY))); + + stateStore.refreshCaches(true); + + verifyFirstRegistration( + NAMESERVICES[0], NAMENODES[0], 2, + FederationNamenodeServiceState.ACTIVE); + + // 1) ns0:nn0 - Expired (stale) + // 2) ns0:nn1 - Standby (newest) + // Verify the selected entry is the standby entry as the active entry is + // stale + assertTrue(namenodeResolver.registerNamenode( + createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE))); + + // Expire active registration + Thread.sleep(6000); + + // Refresh standby registration + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY))); + + // Verify that standby is selected (active is now expired) + stateStore.refreshCaches(true); + verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 1, + FederationNamenodeServiceState.STANDBY); + + // 1) ns0:nn0 - Active + // 2) ns0:nn1 - Unavailable (newest) + // Verify the selected entry is the active entry + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE))); + Thread.sleep(100); + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[1], null))); + stateStore.refreshCaches(true); + verifyFirstRegistration(NAMESERVICES[0], NAMENODES[0], 2, + FederationNamenodeServiceState.ACTIVE); + + // 1) ns0:nn0 - Unavailable (newest) + // 2) ns0:nn1 - Standby + // Verify the selected entry is the standby entry + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY))); + Thread.sleep(1000); + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], null))); + + stateStore.refreshCaches(true); + verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 2, + FederationNamenodeServiceState.STANDBY); + + // 1) ns0:nn0 - Active (oldest) + // 2) ns0:nn1 - Standby + // 3) ns0:nn2 - Active (newest) + // Verify the selected entry is the newest active entry + assertTrue(namenodeResolver.registerNamenode( + createNamenodeReport(NAMESERVICES[0], NAMENODES[0], null))); + Thread.sleep(100); + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY))); + Thread.sleep(100); + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[2], HAServiceState.ACTIVE))); + + stateStore.refreshCaches(true); + verifyFirstRegistration(NAMESERVICES[0], NAMENODES[2], 3, + FederationNamenodeServiceState.ACTIVE); + + // 1) ns0:nn0 - Standby (oldest) + // 2) ns0:nn1 - Standby (newest) + // 3) ns0:nn2 - Standby + // Verify the selected entry is the newest standby entry + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[0], HAServiceState.STANDBY))); + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[2], HAServiceState.STANDBY))); + Thread.sleep(1500); + assertTrue(namenodeResolver.registerNamenode(createNamenodeReport( + NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY))); + + stateStore.refreshCaches(true); + verifyFirstRegistration(NAMESERVICES[0], NAMENODES[1], 3, + FederationNamenodeServiceState.STANDBY); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java new file mode 100644 index 0000000..42ede62 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Test the {@link LocalResolver}. + */ +public class TestLocalResolver { + + @Test + @SuppressWarnings("unchecked") + public void testLocalResolver() throws IOException { + + // Mock the subcluster mapping + Configuration conf = new Configuration(); + Router router = mock(Router.class); + StateStoreService stateStore = mock(StateStoreService.class); + MembershipStore membership = mock(MembershipStore.class); + when(router.getStateStore()).thenReturn(stateStore); + when(stateStore.getRegisteredRecordStore(any(Class.class))) + .thenReturn(membership); + GetNamenodeRegistrationsResponse response = + GetNamenodeRegistrationsResponse.newInstance(); + // Set the mapping for each client + List<MembershipState> records = new LinkedList<>(); + records.add(newMembershipState("client0", "subcluster0")); + records.add(newMembershipState("client1", "subcluster1")); + records.add(newMembershipState("client2", "subcluster2")); + response.setNamenodeMemberships(records); + when(membership.getNamenodeRegistrations( + any(GetNamenodeRegistrationsRequest.class))).thenReturn(response); + + // Mock the client resolution: it will be anything in sb + StringBuilder sb = new StringBuilder("clientX"); + LocalResolver localResolver = new LocalResolver(conf, router); + LocalResolver spyLocalResolver = spy(localResolver); + doAnswer(new Answer<String>() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return sb.toString(); + } + }).when(spyLocalResolver).getClientAddr(); + + // Add the mocks to the resolver + MultipleDestinationMountTableResolver resolver = + new MultipleDestinationMountTableResolver(conf, router); + resolver.addResolver(DestinationOrder.LOCAL, spyLocalResolver); + + + // We point /local to subclusters 0, 1, 2 with the local order + Map<String, String> mapLocal = new HashMap<>(); + mapLocal.put("subcluster0", "/local"); + mapLocal.put("subcluster1", "/local"); + mapLocal.put("subcluster2", "/local"); + MountTable localEntry = MountTable.newInstance("/local", mapLocal); + localEntry.setDestOrder(DestinationOrder.LOCAL); + resolver.addEntry(localEntry); + + // Test first with the default destination + PathLocation dest = resolver.getDestinationForPath("/local/file0.txt"); + assertDestination("subcluster0", dest); + + // We change the client location and verify + setClient(sb, "client2"); + dest = resolver.getDestinationForPath("/local/file0.txt"); + assertDestination("subcluster2", dest); + + setClient(sb, "client1"); + dest = resolver.getDestinationForPath("/local/file0.txt"); + assertDestination("subcluster1", dest); + + setClient(sb, "client0"); + dest = resolver.getDestinationForPath("/local/file0.txt"); + assertDestination("subcluster0", dest); + } + + private void assertDestination(String expectedNsId, PathLocation loc) { + List<RemoteLocation> dests = loc.getDestinations(); + RemoteLocation dest = dests.get(0); + assertEquals(expectedNsId, dest.getNameserviceId()); + } + + private MembershipState newMembershipState(String addr, String nsId) { + return MembershipState.newInstance( + "routerId", nsId, "nn0", "cluster0", "blockPool0", + addr + ":8001", addr + ":8002", addr + ":8003", addr + ":8004", + FederationNamenodeServiceState.ACTIVE, false); + } + + /** + * Set the address of the client issuing the request. We use a StringBuilder + * to modify the value in place for the mock. + * @param sb StringBuilder to set the client string. + * @param client Address of the client. + */ + private static void setClient(StringBuilder sb, String client) { + sb.replace(0, sb.length(), client); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java new file mode 100644 index 0000000..741d1f6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test functionalities of {@link ConnectionManager}, which manages a pool + * of connections to NameNodes. + */ +public class TestConnectionManager { + private Configuration conf; + private ConnectionManager connManager; + private static final String[] TEST_GROUP = new String[]{"TEST_GROUP"}; + private static final UserGroupInformation TEST_USER1 = + UserGroupInformation.createUserForTesting("user1", TEST_GROUP); + private static final UserGroupInformation TEST_USER2 = + UserGroupInformation.createUserForTesting("user2", TEST_GROUP); + private static final UserGroupInformation TEST_USER3 = + UserGroupInformation.createUserForTesting("user3", TEST_GROUP); + private static final String TEST_NN_ADDRESS = "nn1:8080"; + + @Before + public void setup() throws Exception { + conf = new Configuration(); + connManager = new ConnectionManager(conf); + NetUtils.addStaticResolution("nn1", "localhost"); + NetUtils.createSocketAddrForHost("nn1", 8080); + connManager.start(); + } + + @After + public void shutdown() { + if (connManager != null) { + connManager.close(); + } + } + + @Test + public void testCleanup() throws Exception { + Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools(); + + ConnectionPool pool1 = new ConnectionPool( + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10); + addConnectionsToPool(pool1, 9, 4); + poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool1); + + ConnectionPool pool2 = new ConnectionPool( + conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10); + addConnectionsToPool(pool2, 10, 10); + poolMap.put(new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS), pool2); + + checkPoolConnections(TEST_USER1, 9, 4); + checkPoolConnections(TEST_USER2, 10, 10); + + // Clean up first pool, one connection should be removed, and second pool + // should remain the same. + connManager.cleanup(pool1); + checkPoolConnections(TEST_USER1, 8, 4); + checkPoolConnections(TEST_USER2, 10, 10); + + // Clean up the first pool again, it should have no effect since it reached + // the MIN_ACTIVE_RATIO. + connManager.cleanup(pool1); + checkPoolConnections(TEST_USER1, 8, 4); + checkPoolConnections(TEST_USER2, 10, 10); + + // Make sure the number of connections doesn't go below minSize + ConnectionPool pool3 = new ConnectionPool( + conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10); + addConnectionsToPool(pool3, 10, 0); + poolMap.put(new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS), pool3); + connManager.cleanup(pool3); + checkPoolConnections(TEST_USER3, 2, 0); + // With active connections added to pool, make sure it honors the + // MIN_ACTIVE_RATIO again + addConnectionsToPool(pool3, 10, 2); + connManager.cleanup(pool3); + checkPoolConnections(TEST_USER3, 4, 2); + } + + @Test + public void testGetConnection() throws Exception { + Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools(); + final int totalConns = 10; + int activeConns = 5; + + ConnectionPool pool = new ConnectionPool( + conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10); + addConnectionsToPool(pool, totalConns, activeConns); + poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool); + + // All remaining connections should be usable + final int remainingSlots = totalConns - activeConns; + for (int i = 0; i < remainingSlots; i++) { + ConnectionContext cc = pool.getConnection(); + assertTrue(cc.isUsable()); + cc.getClient(); + activeConns++; + } + + checkPoolConnections(TEST_USER1, totalConns, activeConns); + + // Ask for more and this returns an active connection + ConnectionContext cc = pool.getConnection(); + assertTrue(cc.isActive()); + } + + private void addConnectionsToPool(ConnectionPool pool, int numTotalConn, + int numActiveConn) throws IOException { + for (int i = 0; i < numTotalConn; i++) { + ConnectionContext cc = pool.newConnection(); + pool.addConnection(cc); + if (i < numActiveConn) { + cc.getClient(); + } + } + } + + private void checkPoolConnections(UserGroupInformation ugi, + int numOfConns, int numOfActiveConns) { + for (Map.Entry<ConnectionPoolId, ConnectionPool> e : + connManager.getPools().entrySet()) { + if (e.getKey().getUgi() == ugi) { + assertEquals(numOfConns, e.getValue().getNumConnections()); + assertEquals(numOfActiveConns, e.getValue().getNumActiveConnections()); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java new file mode 100644 index 0000000..877fb02 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.service.Service.STATE; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** + * Test the service that heartbeats the state of the namenodes to the State + * Store. + */ +public class TestNamenodeHeartbeat { + + private static RouterDFSCluster cluster; + private static ActiveNamenodeResolver namenodeResolver; + private static List<NamenodeHeartbeatService> services; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void globalSetUp() throws Exception { + + cluster = new RouterDFSCluster(true, 2); + + // Start NNs and DNs and wait until ready + cluster.startCluster(); + + // Mock locator that records the heartbeats + List<String> nss = cluster.getNameservices(); + String ns = nss.get(0); + Configuration conf = cluster.generateNamenodeConfiguration(ns); + namenodeResolver = new MockResolver(conf); + namenodeResolver.setRouterId("testrouter"); + + // Create one heartbeat service per NN + services = new ArrayList<>(); + for (NamenodeContext nn : cluster.getNamenodes()) { + String nsId = nn.getNameserviceId(); + String nnId = nn.getNamenodeId(); + NamenodeHeartbeatService service = new NamenodeHeartbeatService( + namenodeResolver, nsId, nnId); + service.init(conf); + service.start(); + services.add(service); + } + } + + @AfterClass + public static void tearDown() throws IOException { + cluster.shutdown(); + for (NamenodeHeartbeatService service: services) { + service.stop(); + service.close(); + } + } + + @Test + public void testNamenodeHeartbeatService() throws IOException { + + RouterDFSCluster testCluster = new RouterDFSCluster(true, 1); + Configuration heartbeatConfig = testCluster.generateNamenodeConfiguration( + NAMESERVICES[0]); + NamenodeHeartbeatService server = new NamenodeHeartbeatService( + namenodeResolver, NAMESERVICES[0], NAMENODES[0]); + server.init(heartbeatConfig); + assertEquals(STATE.INITED, server.getServiceState()); + server.start(); + assertEquals(STATE.STARTED, server.getServiceState()); + server.stop(); + assertEquals(STATE.STOPPED, server.getServiceState()); + server.close(); + } + + @Test + public void testHearbeat() throws InterruptedException, IOException { + + // Set NAMENODE1 to active for all nameservices + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + } + } + + // Wait for heartbeats to record + Thread.sleep(5000); + + // Verify the locator has matching NN entries for each NS + for (String ns : cluster.getNameservices()) { + List<? extends FederationNamenodeContext> nns = + namenodeResolver.getNamenodesForNameserviceId(ns); + + // Active + FederationNamenodeContext active = nns.get(0); + assertEquals(NAMENODES[0], active.getNamenodeId()); + + // Standby + FederationNamenodeContext standby = nns.get(1); + assertEquals(NAMENODES[1], standby.getNamenodeId()); + } + + // Switch active NNs in 1/2 nameservices + List<String> nss = cluster.getNameservices(); + String failoverNS = nss.get(0); + String normalNs = nss.get(1); + + cluster.switchToStandby(failoverNS, NAMENODES[0]); + cluster.switchToActive(failoverNS, NAMENODES[1]); + + // Wait for heartbeats to record + Thread.sleep(5000); + + // Verify the locator has recorded the failover for the failover NS + List<? extends FederationNamenodeContext> failoverNSs = + namenodeResolver.getNamenodesForNameserviceId(failoverNS); + // Active + FederationNamenodeContext active = failoverNSs.get(0); + assertEquals(NAMENODES[1], active.getNamenodeId()); + + // Standby + FederationNamenodeContext standby = failoverNSs.get(1); + assertEquals(NAMENODES[0], standby.getNamenodeId()); + + // Verify the locator has the same records for the other ns + List<? extends FederationNamenodeContext> normalNss = + namenodeResolver.getNamenodesForNameserviceId(normalNs); + // Active + active = normalNss.get(0); + assertEquals(NAMENODES[0], active.getNamenodeId()); + // Standby + standby = normalNss.get(1); + assertEquals(NAMENODES[1], standby.getNamenodeId()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java new file mode 100644 index 0000000..e130b7b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.TestConfigurationFieldsBase; + +import java.util.HashSet; + +/** + * Unit test class to compare the following RBF configuration class: + * <p></p> + * {@link RBFConfigKeys} + * <p></p> + * against hdfs-rbf-default.xml for missing properties. + * <p></p> + * Refer to {@link org.apache.hadoop.conf.TestConfigurationFieldsBase} + * for how this class works. + */ +public class TestRBFConfigFields extends TestConfigurationFieldsBase { + @Override + public void initializeMemberVariables() { + xmlFilename = "hdfs-rbf-default.xml"; + configurationClasses = new Class[] {RBFConfigKeys.class}; + + // Set error modes + errorIfMissingConfigProps = true; + errorIfMissingXmlProps = true; + + // Initialize used variables + configurationPropsToSkipCompare = new HashSet<String>(); + + // Allocate + xmlPropsToSkipCompare = new HashSet<String>(); + xmlPrefixToSkipCompare = new HashSet<String>(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java new file mode 100644 index 0000000..39398f7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.net.URISyntaxException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.service.Service.STATE; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * The the safe mode for the {@link Router} controlled by + * {@link SafeModeTimer}. + */ +public class TestRouter { + + private static Configuration conf; + + @BeforeClass + public static void create() throws IOException { + // Basic configuration without the state store + conf = new Configuration(); + // 1 sec cache refresh + conf.setInt(RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, 1); + // Mock resolver classes + conf.setClass(RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + MockResolver.class, ActiveNamenodeResolver.class); + conf.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + MockResolver.class, FileSubclusterResolver.class); + + // Bind to any available port + conf.set(RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0"); + conf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0"); + conf.set(RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(RBFConfigKeys.DFS_ROUTER_HTTP_BIND_HOST_KEY, "0.0.0.0"); + + // Simulate a co-located NN + conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns0"); + conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + "ns0"); + conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + "ns0", + "127.0.0.1:0" + 0); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + "ns0", + "127.0.0.1:" + 0); + conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + "ns0", + "0.0.0.0"); + } + + @AfterClass + public static void destroy() { + } + + @Before + public void setup() throws IOException, URISyntaxException { + } + + @After + public void cleanup() { + } + + private static void testRouterStartup(Configuration routerConfig) + throws InterruptedException, IOException { + Router router = new Router(); + assertEquals(STATE.NOTINITED, router.getServiceState()); + router.init(routerConfig); + assertEquals(STATE.INITED, router.getServiceState()); + router.start(); + assertEquals(STATE.STARTED, router.getServiceState()); + router.stop(); + assertEquals(STATE.STOPPED, router.getServiceState()); + router.close(); + } + + @Test + public void testRouterService() throws InterruptedException, IOException { + + // Admin only + testRouterStartup(new RouterConfigBuilder(conf).admin().build()); + + // Http only + testRouterStartup(new RouterConfigBuilder(conf).http().build()); + + // Rpc only + testRouterStartup(new RouterConfigBuilder(conf).rpc().build()); + + // Metrics only + testRouterStartup(new RouterConfigBuilder(conf).metrics().build()); + + // Statestore only + testRouterStartup(new RouterConfigBuilder(conf).stateStore().build()); + + // Heartbeat only + testRouterStartup(new RouterConfigBuilder(conf).heartbeat().build()); + + // Run with all services + testRouterStartup(new RouterConfigBuilder(conf).all().build()); + } + + @Test + public void testRouterRestartRpcService() throws IOException { + + // Start + Router router = new Router(); + router.init(new RouterConfigBuilder(conf).rpc().build()); + router.start(); + + // Verify RPC server is running + assertNotNull(router.getRpcServerAddress()); + RouterRpcServer rpcServer = router.getRpcServer(); + assertNotNull(rpcServer); + assertEquals(STATE.STARTED, rpcServer.getServiceState()); + + // Stop router and RPC server + router.stop(); + assertEquals(STATE.STOPPED, rpcServer.getServiceState()); + + router.close(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java new file mode 100644 index 0000000..a8ffded --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.util.Time; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * The administrator interface of the {@link Router} implemented by + * {@link RouterAdminServer}. + */ +public class TestRouterAdmin { + + private static StateStoreDFSCluster cluster; + private static RouterContext routerContext; + public static final String RPC_BEAN = + "Hadoop:service=Router,name=FederationRPC"; + private static List<MountTable> mockMountTable; + private static StateStoreService stateStore; + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new StateStoreDFSCluster(false, 1); + // Build and start a router with State Store + admin + RPC + Configuration conf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + cluster.addRouterOverrides(conf); + cluster.startRouters(); + routerContext = cluster.getRandomRouter(); + mockMountTable = cluster.generateMockMountTable(); + Router router = routerContext.getRouter(); + stateStore = router.getStateStore(); + } + + @AfterClass + public static void tearDown() { + cluster.stopRouter(routerContext); + } + + @Before + public void testSetup() throws Exception { + assertTrue( + synchronizeRecords(stateStore, mockMountTable, MountTable.class)); + } + + @Test + public void testAddMountTable() throws IOException { + MountTable newEntry = MountTable.newInstance( + "/testpath", Collections.singletonMap("ns0", "/testdir"), + Time.now(), Time.now()); + + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTable = client.getMountTableManager(); + + // Existing mount table size + List<MountTable> records = getMountTableEntries(mountTable); + assertEquals(records.size(), mockMountTable.size()); + + // Add + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(newEntry); + AddMountTableEntryResponse addResponse = + mountTable.addMountTableEntry(addRequest); + assertTrue(addResponse.getStatus()); + + // New mount table size + List<MountTable> records2 = getMountTableEntries(mountTable); + assertEquals(records2.size(), mockMountTable.size() + 1); + } + + @Test + public void testAddDuplicateMountTable() throws IOException { + MountTable newEntry = MountTable.newInstance("/testpath", + Collections.singletonMap("ns0", "/testdir"), Time.now(), Time.now()); + + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTable = client.getMountTableManager(); + + // Existing mount table size + List<MountTable> entries1 = getMountTableEntries(mountTable); + assertEquals(entries1.size(), mockMountTable.size()); + + // Add + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(newEntry); + AddMountTableEntryResponse addResponse = + mountTable.addMountTableEntry(addRequest); + assertTrue(addResponse.getStatus()); + + // New mount table size + List<MountTable> entries2 = getMountTableEntries(mountTable); + assertEquals(entries2.size(), mockMountTable.size() + 1); + + // Add again, should fail + AddMountTableEntryResponse addResponse2 = + mountTable.addMountTableEntry(addRequest); + assertFalse(addResponse2.getStatus()); + } + + @Test + public void testAddReadOnlyMountTable() throws IOException { + MountTable newEntry = MountTable.newInstance( + "/readonly", Collections.singletonMap("ns0", "/testdir"), + Time.now(), Time.now()); + newEntry.setReadOnly(true); + + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTable = client.getMountTableManager(); + + // Existing mount table size + List<MountTable> records = getMountTableEntries(mountTable); + assertEquals(records.size(), mockMountTable.size()); + + // Add + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(newEntry); + AddMountTableEntryResponse addResponse = + mountTable.addMountTableEntry(addRequest); + assertTrue(addResponse.getStatus()); + + // New mount table size + List<MountTable> records2 = getMountTableEntries(mountTable); + assertEquals(records2.size(), mockMountTable.size() + 1); + + // Check that we have the read only entry + MountTable record = getMountTableEntry("/readonly"); + assertEquals("/readonly", record.getSourcePath()); + assertTrue(record.isReadOnly()); + + // Removing the new entry + RemoveMountTableEntryRequest removeRequest = + RemoveMountTableEntryRequest.newInstance("/readonly"); + RemoveMountTableEntryResponse removeResponse = + mountTable.removeMountTableEntry(removeRequest); + assertTrue(removeResponse.getStatus()); + } + + @Test + public void testAddOrderMountTable() throws IOException { + testAddOrderMountTable(DestinationOrder.HASH); + testAddOrderMountTable(DestinationOrder.LOCAL); + testAddOrderMountTable(DestinationOrder.RANDOM); + testAddOrderMountTable(DestinationOrder.HASH_ALL); + } + + private void testAddOrderMountTable(final DestinationOrder order) + throws IOException { + final String mnt = "/" + order; + MountTable newEntry = MountTable.newInstance( + mnt, Collections.singletonMap("ns0", "/testdir"), + Time.now(), Time.now()); + newEntry.setDestOrder(order); + + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTable = client.getMountTableManager(); + + // Add + AddMountTableEntryRequest addRequest; + AddMountTableEntryResponse addResponse; + addRequest = AddMountTableEntryRequest.newInstance(newEntry); + addResponse = mountTable.addMountTableEntry(addRequest); + assertTrue(addResponse.getStatus()); + + // Check that we have the read only entry + MountTable record = getMountTableEntry(mnt); + assertEquals(mnt, record.getSourcePath()); + assertEquals(order, record.getDestOrder()); + + // Removing the new entry + RemoveMountTableEntryRequest removeRequest = + RemoveMountTableEntryRequest.newInstance(mnt); + RemoveMountTableEntryResponse removeResponse = + mountTable.removeMountTableEntry(removeRequest); + assertTrue(removeResponse.getStatus()); + } + + @Test + public void testRemoveMountTable() throws IOException { + + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTable = client.getMountTableManager(); + + // Existing mount table size + List<MountTable> entries1 = getMountTableEntries(mountTable); + assertEquals(entries1.size(), mockMountTable.size()); + + // Remove an entry + RemoveMountTableEntryRequest removeRequest = + RemoveMountTableEntryRequest.newInstance("/"); + mountTable.removeMountTableEntry(removeRequest); + + // New mount table size + List<MountTable> entries2 = getMountTableEntries(mountTable); + assertEquals(entries2.size(), mockMountTable.size() - 1); + } + + @Test + public void testEditMountTable() throws IOException { + + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTable = client.getMountTableManager(); + + // Verify starting condition + MountTable entry = getMountTableEntry("/"); + assertEquals( + Collections.singletonList(new RemoteLocation("ns0", "/")), + entry.getDestinations()); + + // Edit the entry for / + MountTable updatedEntry = MountTable.newInstance( + "/", Collections.singletonMap("ns1", "/"), Time.now(), Time.now()); + UpdateMountTableEntryRequest updateRequest = + UpdateMountTableEntryRequest.newInstance(updatedEntry); + mountTable.updateMountTableEntry(updateRequest); + + // Verify edited condition + entry = getMountTableEntry("/"); + assertEquals( + Collections.singletonList(new RemoteLocation("ns1", "/")), + entry.getDestinations()); + } + + @Test + public void testGetMountTable() throws IOException { + + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTable = client.getMountTableManager(); + + // Verify size of table + List<MountTable> entries = getMountTableEntries(mountTable); + assertEquals(mockMountTable.size(), entries.size()); + + // Verify all entries are present + int matches = 0; + for (MountTable e : entries) { + for (MountTable entry : mockMountTable) { + assertEquals(e.getDestinations().size(), 1); + assertNotNull(e.getDateCreated()); + assertNotNull(e.getDateModified()); + if (entry.getSourcePath().equals(e.getSourcePath())) { + matches++; + } + } + } + assertEquals(matches, mockMountTable.size()); + } + + @Test + public void testGetSingleMountTableEntry() throws IOException { + MountTable entry = getMountTableEntry("/ns0"); + assertNotNull(entry); + assertEquals(entry.getSourcePath(), "/ns0"); + } + + /** + * Gets an existing mount table record in the state store. + * + * @param mount The mount point of the record to remove. + * @return The matching record if found, null if it is not found. + * @throws IOException If the state store could not be accessed. + */ + private MountTable getMountTableEntry(final String mount) throws IOException { + // Refresh the cache + stateStore.loadCache(MountTableStoreImpl.class, true); + + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance(mount); + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTable = client.getMountTableManager(); + List<MountTable> results = getMountTableEntries(mountTable, request); + if (results.size() > 0) { + // First result is sorted to have the shortest mount string length + return results.get(0); + } + return null; + } + + private List<MountTable> getMountTableEntries(MountTableManager mountTable) + throws IOException { + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance("/"); + return getMountTableEntries(mountTable, request); + } + + private List<MountTable> getMountTableEntries(MountTableManager mountTable, + GetMountTableEntriesRequest request) throws IOException { + stateStore.loadCache(MountTableStoreImpl.class, true); + GetMountTableEntriesResponse response = + mountTable.getMountTableEntries(request); + return response.getEntries(); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org