http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java new file mode 100644 index 0000000..dd349da --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java @@ -0,0 +1,463 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.ROUTERS; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.util.Time; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the basic {@link MembershipStore} membership functionality. + */ +public class TestStateStoreMembershipState extends TestStateStoreBase { + + private static MembershipStore membershipStore; + + @BeforeClass + public static void create() { + // Reduce expirations to 5 seconds + getConf().setLong( + RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, + TimeUnit.SECONDS.toMillis(5)); + } + + @Before + public void setup() throws IOException, InterruptedException { + + membershipStore = + getStateStore().getRegisteredRecordStore(MembershipStore.class); + + // Clear NN registrations + assertTrue(clearRecords(getStateStore(), MembershipState.class)); + } + + @Test + public void testNamenodeStateOverride() throws Exception { + // Populate the state store + // 1) ns0:nn0 - Standby + String ns = "ns0"; + String nn = "nn0"; + MembershipState report = createRegistration( + ns, nn, ROUTERS[1], FederationNamenodeServiceState.STANDBY); + assertTrue(namenodeHeartbeat(report)); + + // Load data into cache and calculate quorum + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + + MembershipState existingState = getNamenodeRegistration(ns, nn); + assertEquals( + FederationNamenodeServiceState.STANDBY, existingState.getState()); + + // Override cache + UpdateNamenodeRegistrationRequest request = + UpdateNamenodeRegistrationRequest.newInstance( + ns, nn, FederationNamenodeServiceState.ACTIVE); + assertTrue(membershipStore.updateNamenodeRegistration(request).getResult()); + + MembershipState newState = getNamenodeRegistration(ns, nn); + assertEquals(FederationNamenodeServiceState.ACTIVE, newState.getState()); + } + + @Test + public void testStateStoreDisconnected() throws Exception { + + // Close the data store driver + getStateStore().closeDriver(); + assertFalse(getStateStore().isDriverReady()); + + NamenodeHeartbeatRequest hbRequest = NamenodeHeartbeatRequest.newInstance(); + hbRequest.setNamenodeMembership( + createMockRegistrationForNamenode( + "test", "test", FederationNamenodeServiceState.UNAVAILABLE)); + verifyException(membershipStore, "namenodeHeartbeat", + StateStoreUnavailableException.class, + new Class[] {NamenodeHeartbeatRequest.class}, + new Object[] {hbRequest }); + + // Information from cache, no exception should be triggered for these + // TODO - should cached info expire at some point? + GetNamenodeRegistrationsRequest getRequest = + GetNamenodeRegistrationsRequest.newInstance(); + verifyException(membershipStore, + "getNamenodeRegistrations", null, + new Class[] {GetNamenodeRegistrationsRequest.class}, + new Object[] {getRequest}); + + verifyException(membershipStore, + "getExpiredNamenodeRegistrations", null, + new Class[] {GetNamenodeRegistrationsRequest.class}, + new Object[] {getRequest}); + + UpdateNamenodeRegistrationRequest overrideRequest = + UpdateNamenodeRegistrationRequest.newInstance(); + verifyException(membershipStore, + "updateNamenodeRegistration", null, + new Class[] {UpdateNamenodeRegistrationRequest.class}, + new Object[] {overrideRequest}); + } + + private void registerAndLoadRegistrations( + List<MembershipState> registrationList) throws IOException { + // Populate + assertTrue(synchronizeRecords( + getStateStore(), registrationList, MembershipState.class)); + + // Load into cache + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + } + + private MembershipState createRegistration(String ns, String nn, + String router, FederationNamenodeServiceState state) throws IOException { + MembershipState record = MembershipState.newInstance( + router, ns, + nn, "testcluster", "testblock-" + ns, "testrpc-"+ ns + nn, + "testservice-"+ ns + nn, "testlifeline-"+ ns + nn, + "testweb-" + ns + nn, state, false); + return record; + } + + @Test + public void testRegistrationMajorityQuorum() + throws InterruptedException, IOException { + + // Populate the state store with a set of non-matching elements + // 1) ns0:nn0 - Standby (newest) + // 2) ns0:nn0 - Active (oldest) + // 3) ns0:nn0 - Active (2nd oldest) + // 4) ns0:nn0 - Active (3nd oldest element, newest active element) + // Verify the selected entry is the newest majority opinion (4) + String ns = "ns0"; + String nn = "nn0"; + + // Active - oldest + MembershipState report = createRegistration( + ns, nn, ROUTERS[1], FederationNamenodeServiceState.ACTIVE); + assertTrue(namenodeHeartbeat(report)); + Thread.sleep(1000); + + // Active - 2nd oldest + report = createRegistration( + ns, nn, ROUTERS[2], FederationNamenodeServiceState.ACTIVE); + assertTrue(namenodeHeartbeat(report)); + Thread.sleep(1000); + + // Active - 3rd oldest, newest active element + report = createRegistration( + ns, nn, ROUTERS[3], FederationNamenodeServiceState.ACTIVE); + assertTrue(namenodeHeartbeat(report)); + + // standby - newest overall + report = createRegistration( + ns, nn, ROUTERS[0], FederationNamenodeServiceState.STANDBY); + assertTrue(namenodeHeartbeat(report)); + + // Load and calculate quorum + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + + // Verify quorum entry + MembershipState quorumEntry = getNamenodeRegistration( + report.getNameserviceId(), report.getNamenodeId()); + assertNotNull(quorumEntry); + assertEquals(quorumEntry.getRouterId(), ROUTERS[3]); + } + + @Test + public void testRegistrationQuorumExcludesExpired() + throws InterruptedException, IOException { + + // Populate the state store with some expired entries and verify the expired + // entries are ignored. + // 1) ns0:nn0 - Active + // 2) ns0:nn0 - Expired + // 3) ns0:nn0 - Expired + // 4) ns0:nn0 - Expired + // Verify the selected entry is the active entry + List<MembershipState> registrationList = new ArrayList<>(); + String ns = "ns0"; + String nn = "nn0"; + String rpcAddress = "testrpcaddress"; + String serviceAddress = "testserviceaddress"; + String lifelineAddress = "testlifelineaddress"; + String blockPoolId = "testblockpool"; + String clusterId = "testcluster"; + String webAddress = "testwebaddress"; + boolean safemode = false; + + // Active + MembershipState record = MembershipState.newInstance( + ROUTERS[0], ns, nn, clusterId, blockPoolId, + rpcAddress, serviceAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.ACTIVE, safemode); + registrationList.add(record); + + // Expired + record = MembershipState.newInstance( + ROUTERS[1], ns, nn, clusterId, blockPoolId, + rpcAddress, serviceAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.EXPIRED, safemode); + registrationList.add(record); + + // Expired + record = MembershipState.newInstance( + ROUTERS[2], ns, nn, clusterId, blockPoolId, + rpcAddress, serviceAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.EXPIRED, safemode); + registrationList.add(record); + + // Expired + record = MembershipState.newInstance( + ROUTERS[3], ns, nn, clusterId, blockPoolId, + rpcAddress, serviceAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.EXPIRED, safemode); + registrationList.add(record); + registerAndLoadRegistrations(registrationList); + + // Verify quorum entry chooses active membership + MembershipState quorumEntry = getNamenodeRegistration( + record.getNameserviceId(), record.getNamenodeId()); + assertNotNull(quorumEntry); + assertEquals(ROUTERS[0], quorumEntry.getRouterId()); + } + + @Test + public void testRegistrationQuorumAllExpired() throws IOException { + + // 1) ns0:nn0 - Expired (oldest) + // 2) ns0:nn0 - Expired + // 3) ns0:nn0 - Expired + // 4) ns0:nn0 - Expired + // Verify no entry is either selected or cached + List<MembershipState> registrationList = new ArrayList<>(); + String ns = NAMESERVICES[0]; + String nn = NAMENODES[0]; + String rpcAddress = "testrpcaddress"; + String serviceAddress = "testserviceaddress"; + String lifelineAddress = "testlifelineaddress"; + String blockPoolId = "testblockpool"; + String clusterId = "testcluster"; + String webAddress = "testwebaddress"; + boolean safemode = false; + long startingTime = Time.now(); + + // Expired + MembershipState record = MembershipState.newInstance( + ROUTERS[0], ns, nn, clusterId, blockPoolId, + rpcAddress, webAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.EXPIRED, safemode); + record.setDateModified(startingTime - 10000); + registrationList.add(record); + + // Expired + record = MembershipState.newInstance( + ROUTERS[1], ns, nn, clusterId, blockPoolId, + rpcAddress, serviceAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.EXPIRED, safemode); + record.setDateModified(startingTime); + registrationList.add(record); + + // Expired + record = MembershipState.newInstance( + ROUTERS[2], ns, nn, clusterId, blockPoolId, + rpcAddress, serviceAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.EXPIRED, safemode); + record.setDateModified(startingTime); + registrationList.add(record); + + // Expired + record = MembershipState.newInstance( + ROUTERS[3], ns, nn, clusterId, blockPoolId, + rpcAddress, serviceAddress, lifelineAddress, webAddress, + FederationNamenodeServiceState.EXPIRED, safemode); + record.setDateModified(startingTime); + registrationList.add(record); + + registerAndLoadRegistrations(registrationList); + + // Verify no entry is found for this nameservice + assertNull(getNamenodeRegistration( + record.getNameserviceId(), record.getNamenodeId())); + } + + @Test + public void testRegistrationNoQuorum() + throws InterruptedException, IOException { + + // Populate the state store with a set of non-matching elements + // 1) ns0:nn0 - Standby (newest) + // 2) ns0:nn0 - Standby (oldest) + // 3) ns0:nn0 - Active (2nd oldest) + // 4) ns0:nn0 - Active (3nd oldest element, newest active element) + // Verify the selected entry is the newest entry (1) + MembershipState report1 = createRegistration( + NAMESERVICES[0], NAMENODES[0], ROUTERS[1], + FederationNamenodeServiceState.STANDBY); + assertTrue(namenodeHeartbeat(report1)); + Thread.sleep(100); + MembershipState report2 = createRegistration( + NAMESERVICES[0], NAMENODES[0], ROUTERS[2], + FederationNamenodeServiceState.ACTIVE); + assertTrue(namenodeHeartbeat(report2)); + Thread.sleep(100); + MembershipState report3 = createRegistration( + NAMESERVICES[0], NAMENODES[0], ROUTERS[3], + FederationNamenodeServiceState.ACTIVE); + assertTrue(namenodeHeartbeat(report3)); + Thread.sleep(100); + MembershipState report4 = createRegistration( + NAMESERVICES[0], NAMENODES[0], ROUTERS[0], + FederationNamenodeServiceState.STANDBY); + assertTrue(namenodeHeartbeat(report4)); + + // Load and calculate quorum + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + + // Verify quorum entry uses the newest data, even though it is standby + MembershipState quorumEntry = getNamenodeRegistration( + report1.getNameserviceId(), report1.getNamenodeId()); + assertNotNull(quorumEntry); + assertEquals(ROUTERS[0], quorumEntry.getRouterId()); + assertEquals( + FederationNamenodeServiceState.STANDBY, quorumEntry.getState()); + } + + @Test + public void testRegistrationExpired() + throws InterruptedException, IOException { + + // Populate the state store with a single NN element + // 1) ns0:nn0 - Active + // Wait for the entry to expire without heartbeating + // Verify the NN entry is populated as EXPIRED internally in the state store + + MembershipState report = createRegistration( + NAMESERVICES[0], NAMENODES[0], ROUTERS[0], + FederationNamenodeServiceState.ACTIVE); + assertTrue(namenodeHeartbeat(report)); + + // Load cache + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + + // Verify quorum and entry + MembershipState quorumEntry = getNamenodeRegistration( + report.getNameserviceId(), report.getNamenodeId()); + assertNotNull(quorumEntry); + assertEquals(ROUTERS[0], quorumEntry.getRouterId()); + assertEquals(FederationNamenodeServiceState.ACTIVE, quorumEntry.getState()); + + // Wait past expiration (set in conf to 5 seconds) + Thread.sleep(6000); + // Reload cache + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + + // Verify entry is now expired and is no longer in the cache + quorumEntry = getNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]); + assertNull(quorumEntry); + + // Verify entry is now expired and can't be used by RPC service + quorumEntry = getNamenodeRegistration( + report.getNameserviceId(), report.getNamenodeId()); + assertNull(quorumEntry); + + // Heartbeat again, updates dateModified + assertTrue(namenodeHeartbeat(report)); + // Reload cache + assertTrue(getStateStore().loadCache(MembershipStore.class, true)); + + // Verify updated entry marked as active and is accessible to RPC server + quorumEntry = getNamenodeRegistration( + report.getNameserviceId(), report.getNamenodeId()); + assertNotNull(quorumEntry); + assertEquals(ROUTERS[0], quorumEntry.getRouterId()); + assertEquals(FederationNamenodeServiceState.ACTIVE, quorumEntry.getState()); + } + + /** + * Get a single namenode membership record from the store. + * + * @param nsId The HDFS nameservice ID to search for + * @param nnId The HDFS namenode ID to search for + * @return The single NamenodeMembershipRecord that matches the query or null + * if not found. + * @throws IOException if the query could not be executed. + */ + private MembershipState getNamenodeRegistration( + final String nsId, final String nnId) throws IOException { + + MembershipState partial = MembershipState.newInstance(); + partial.setNameserviceId(nsId); + partial.setNamenodeId(nnId); + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(partial); + GetNamenodeRegistrationsResponse response = + membershipStore.getNamenodeRegistrations(request); + + List<MembershipState> results = response.getNamenodeMemberships(); + if (results != null && results.size() == 1) { + MembershipState record = results.get(0); + return record; + } + return null; + } + + /** + * Register a namenode heartbeat with the state store. + * + * @param store FederationMembershipStateStore instance to retrieve the + * membership data records. + * @param namenode A fully populated namenode membership record to be + * committed to the data store. + * @return True if successful, false otherwise. + * @throws IOException if the state store query could not be performed. + */ + private boolean namenodeHeartbeat(MembershipState namenode) + throws IOException { + + NamenodeHeartbeatRequest request = + NamenodeHeartbeatRequest.newInstance(namenode); + NamenodeHeartbeatResponse response = + membershipStore.namenodeHeartbeat(request); + return response.getResult(); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java new file mode 100644 index 0000000..d30d6ba --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockMountTable; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the basic {@link StateStoreService} + * {@link MountTableStore} functionality. + */ +public class TestStateStoreMountTable extends TestStateStoreBase { + + private static List<String> nameservices; + private static MountTableStore mountStore; + + @BeforeClass + public static void create() throws IOException { + nameservices = new ArrayList<>(); + nameservices.add(NAMESERVICES[0]); + nameservices.add(NAMESERVICES[1]); + } + + @Before + public void setup() throws IOException, InterruptedException { + mountStore = + getStateStore().getRegisteredRecordStore(MountTableStore.class); + // Clear Mount table registrations + assertTrue(clearRecords(getStateStore(), MountTable.class)); + } + + @Test + public void testStateStoreDisconnected() throws Exception { + + // Close the data store driver + getStateStore().closeDriver(); + assertFalse(getStateStore().isDriverReady()); + + // Test APIs that access the store to check they throw the correct exception + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(); + verifyException(mountStore, "addMountTableEntry", + StateStoreUnavailableException.class, + new Class[] {AddMountTableEntryRequest.class}, + new Object[] {addRequest}); + + UpdateMountTableEntryRequest updateRequest = + UpdateMountTableEntryRequest.newInstance(); + verifyException(mountStore, "updateMountTableEntry", + StateStoreUnavailableException.class, + new Class[] {UpdateMountTableEntryRequest.class}, + new Object[] {updateRequest}); + + RemoveMountTableEntryRequest removeRequest = + RemoveMountTableEntryRequest.newInstance(); + verifyException(mountStore, "removeMountTableEntry", + StateStoreUnavailableException.class, + new Class[] {RemoveMountTableEntryRequest.class}, + new Object[] {removeRequest}); + + GetMountTableEntriesRequest getRequest = + GetMountTableEntriesRequest.newInstance(); + mountStore.loadCache(true); + verifyException(mountStore, "getMountTableEntries", + StateStoreUnavailableException.class, + new Class[] {GetMountTableEntriesRequest.class}, + new Object[] {getRequest}); + } + + @Test + public void testSynchronizeMountTable() throws IOException { + // Synchronize and get mount table entries + List<MountTable> entries = createMockMountTable(nameservices); + assertTrue(synchronizeRecords(getStateStore(), entries, MountTable.class)); + for (MountTable e : entries) { + mountStore.loadCache(true); + MountTable entry = getMountTableEntry(e.getSourcePath()); + assertNotNull(entry); + assertEquals(e.getDefaultLocation().getDest(), + entry.getDefaultLocation().getDest()); + } + } + + @Test + public void testAddMountTableEntry() throws IOException { + + // Add 1 + List<MountTable> entries = createMockMountTable(nameservices); + List<MountTable> entries1 = getMountTableEntries("/").getRecords(); + assertEquals(0, entries1.size()); + MountTable entry0 = entries.get(0); + AddMountTableEntryRequest request = + AddMountTableEntryRequest.newInstance(entry0); + AddMountTableEntryResponse response = + mountStore.addMountTableEntry(request); + assertTrue(response.getStatus()); + + mountStore.loadCache(true); + List<MountTable> entries2 = getMountTableEntries("/").getRecords(); + assertEquals(1, entries2.size()); + } + + @Test + public void testRemoveMountTableEntry() throws IOException { + + // Add many + List<MountTable> entries = createMockMountTable(nameservices); + synchronizeRecords(getStateStore(), entries, MountTable.class); + mountStore.loadCache(true); + List<MountTable> entries1 = getMountTableEntries("/").getRecords(); + assertEquals(entries.size(), entries1.size()); + + // Remove 1 + RemoveMountTableEntryRequest request = + RemoveMountTableEntryRequest.newInstance(); + request.setSrcPath(entries.get(0).getSourcePath()); + assertTrue(mountStore.removeMountTableEntry(request).getStatus()); + + // Verify remove + mountStore.loadCache(true); + List<MountTable> entries2 = getMountTableEntries("/").getRecords(); + assertEquals(entries.size() - 1, entries2.size()); + } + + @Test + public void testUpdateMountTableEntry() throws IOException { + + // Add 1 + List<MountTable> entries = createMockMountTable(nameservices); + MountTable entry0 = entries.get(0); + String srcPath = entry0.getSourcePath(); + String nsId = entry0.getDefaultLocation().getNameserviceId(); + AddMountTableEntryRequest request = + AddMountTableEntryRequest.newInstance(entry0); + AddMountTableEntryResponse response = + mountStore.addMountTableEntry(request); + assertTrue(response.getStatus()); + + // Verify + mountStore.loadCache(true); + MountTable matchingEntry0 = getMountTableEntry(srcPath); + assertNotNull(matchingEntry0); + assertEquals(nsId, matchingEntry0.getDefaultLocation().getNameserviceId()); + + // Edit destination nameservice for source path + Map<String, String> destMap = + Collections.singletonMap("testnameservice", "/"); + MountTable replacement = + MountTable.newInstance(srcPath, destMap); + UpdateMountTableEntryRequest updateRequest = + UpdateMountTableEntryRequest.newInstance(replacement); + UpdateMountTableEntryResponse updateResponse = + mountStore.updateMountTableEntry(updateRequest); + assertTrue(updateResponse.getStatus()); + + // Verify + mountStore.loadCache(true); + MountTable matchingEntry1 = getMountTableEntry(srcPath); + assertNotNull(matchingEntry1); + assertEquals("testnameservice", + matchingEntry1.getDefaultLocation().getNameserviceId()); + } + + /** + * Gets an existing mount table record in the state store. + * + * @param mount The mount point of the record to remove. + * @return The matching record if found, null if it is not found. + * @throws IOException If the state store could not be accessed. + */ + private MountTable getMountTableEntry(String mount) throws IOException { + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance(mount); + GetMountTableEntriesResponse response = + mountStore.getMountTableEntries(request); + List<MountTable> results = response.getEntries(); + if (results.size() > 0) { + // First result is sorted to have the shortest mount string length + return results.get(0); + } + return null; + } + + /** + * Fetch all mount table records beneath a root path. + * + * @param store FederationMountTableStore instance to commit the data. + * @param mount The root search path, enter "/" to return all mount table + * records. + * + * @return A list of all mount table records found below the root mount. + * + * @throws IOException If the state store could not be accessed. + */ + private QueryResult<MountTable> getMountTableEntries(String mount) + throws IOException { + if (mount == null) { + throw new IOException("Please specify a root search path"); + } + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance(); + request.setSrcPath(mount); + GetMountTableEntriesResponse response = + mountStore.getMountTableEntries(request); + List<MountTable> records = response.getEntries(); + long timestamp = response.getTimestamp(); + return new QueryResult<MountTable>(records, timestamp); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java new file mode 100644 index 0000000..db1df19 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.util.Time; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the basic {@link StateStoreService} {@link RouterStore} functionality. + */ +public class TestStateStoreRouterState extends TestStateStoreBase { + + private static RouterStore routerStore; + + @BeforeClass + public static void create() { + // Reduce expirations to 5 seconds + getConf().setTimeDuration( + RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS, + 5, TimeUnit.SECONDS); + } + + @Before + public void setup() throws IOException, InterruptedException { + + if (routerStore == null) { + routerStore = + getStateStore().getRegisteredRecordStore(RouterStore.class); + } + + // Clear router status registrations + assertTrue(clearRecords(getStateStore(), RouterState.class)); + } + + @Test + public void testStateStoreDisconnected() throws Exception { + + // Close the data store driver + getStateStore().closeDriver(); + assertEquals(false, getStateStore().isDriverReady()); + + // Test all APIs that access the data store to ensure they throw the correct + // exception. + GetRouterRegistrationRequest getSingleRequest = + GetRouterRegistrationRequest.newInstance(); + verifyException(routerStore, "getRouterRegistration", + StateStoreUnavailableException.class, + new Class[] {GetRouterRegistrationRequest.class}, + new Object[] {getSingleRequest}); + + GetRouterRegistrationsRequest getRequest = + GetRouterRegistrationsRequest.newInstance(); + routerStore.loadCache(true); + verifyException(routerStore, "getRouterRegistrations", + StateStoreUnavailableException.class, + new Class[] {GetRouterRegistrationsRequest.class}, + new Object[] {getRequest}); + + RouterHeartbeatRequest hbRequest = RouterHeartbeatRequest.newInstance( + RouterState.newInstance("test", 0, RouterServiceState.UNINITIALIZED)); + verifyException(routerStore, "routerHeartbeat", + StateStoreUnavailableException.class, + new Class[] {RouterHeartbeatRequest.class}, + new Object[] {hbRequest}); + } + + // + // Router + // + @Test + public void testUpdateRouterStatus() + throws IllegalStateException, IOException { + + long dateStarted = Time.now(); + String address = "testaddress"; + + // Set + RouterHeartbeatRequest request = RouterHeartbeatRequest.newInstance( + RouterState.newInstance( + address, dateStarted, RouterServiceState.RUNNING)); + assertTrue(routerStore.routerHeartbeat(request).getStatus()); + + // Verify + GetRouterRegistrationRequest getRequest = + GetRouterRegistrationRequest.newInstance(address); + RouterState record = + routerStore.getRouterRegistration(getRequest).getRouter(); + assertNotNull(record); + assertEquals(RouterServiceState.RUNNING, record.getStatus()); + assertEquals(address, record.getAddress()); + assertEquals(FederationUtil.getCompileInfo(), record.getCompileInfo()); + // Build version may vary a bit + assertFalse(record.getVersion().isEmpty()); + } + + @Test + public void testRouterStateExpired() + throws IOException, InterruptedException { + + long dateStarted = Time.now(); + String address = "testaddress"; + + RouterHeartbeatRequest request = RouterHeartbeatRequest.newInstance( + RouterState.newInstance( + address, dateStarted, RouterServiceState.RUNNING)); + // Set + assertTrue(routerStore.routerHeartbeat(request).getStatus()); + + // Verify + GetRouterRegistrationRequest getRequest = + GetRouterRegistrationRequest.newInstance(address); + RouterState record = + routerStore.getRouterRegistration(getRequest).getRouter(); + assertNotNull(record); + + // Wait past expiration (set to 5 sec in config) + Thread.sleep(6000); + + // Verify expired + RouterState r = routerStore.getRouterRegistration(getRequest).getRouter(); + assertEquals(RouterServiceState.EXPIRED, r.getStatus()); + + // Heartbeat again and this shouldn't be EXPIRED anymore + assertTrue(routerStore.routerHeartbeat(request).getStatus()); + r = routerStore.getRouterRegistration(getRequest).getRouter(); + assertEquals(RouterServiceState.RUNNING, r.getStatus()); + } + + @Test + public void testGetAllRouterStates() + throws StateStoreUnavailableException, IOException { + + // Set 2 entries + RouterHeartbeatRequest heartbeatRequest1 = + RouterHeartbeatRequest.newInstance( + RouterState.newInstance( + "testaddress1", Time.now(), RouterServiceState.RUNNING)); + assertTrue(routerStore.routerHeartbeat(heartbeatRequest1).getStatus()); + + RouterHeartbeatRequest heartbeatRequest2 = + RouterHeartbeatRequest.newInstance( + RouterState.newInstance( + "testaddress2", Time.now(), RouterServiceState.RUNNING)); + assertTrue(routerStore.routerHeartbeat(heartbeatRequest2).getStatus()); + + // Verify + routerStore.loadCache(true); + GetRouterRegistrationsRequest request = + GetRouterRegistrationsRequest.newInstance(); + List<RouterState> entries = + routerStore.getRouterRegistrations(request).getRouters(); + assertEquals(2, entries.size()); + Collections.sort(entries); + assertEquals("testaddress1", entries.get(0).getAddress()); + assertEquals("testaddress2", entries.get(1).getAddress()); + assertEquals(RouterServiceState.RUNNING, entries.get(0).getStatus()); + assertEquals(RouterServiceState.RUNNING, entries.get(1).getStatus()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java new file mode 100644 index 0000000..fd29e37 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -0,0 +1,613 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState; +import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion; +import org.junit.After; +import org.junit.AfterClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base tests for the driver. The particular implementations will use this to + * test their functionality. + */ +public class TestStateStoreDriverBase { + + private static final Logger LOG = + LoggerFactory.getLogger(TestStateStoreDriverBase.class); + + private static StateStoreService stateStore; + private static Configuration conf; + + private static final Random RANDOM = new Random(); + + + /** + * Get the State Store driver. + * @return State Store driver. + */ + protected StateStoreDriver getStateStoreDriver() { + return stateStore.getDriver(); + } + + @After + public void cleanMetrics() { + if (stateStore != null) { + StateStoreMetrics metrics = stateStore.getMetrics(); + metrics.reset(); + } + } + + @AfterClass + public static void tearDownCluster() { + if (stateStore != null) { + stateStore.stop(); + } + } + + /** + * Get a new State Store using this configuration. + * + * @param config Configuration for the State Store. + * @throws Exception If we cannot get the State Store. + */ + public static void getStateStore(Configuration config) throws Exception { + conf = config; + stateStore = FederationStateStoreTestUtils.newStateStore(conf); + } + + private String generateRandomString() { + String randomString = "randomString-" + RANDOM.nextInt(); + return randomString; + } + + private long generateRandomLong() { + return RANDOM.nextLong(); + } + + @SuppressWarnings("rawtypes") + private <T extends Enum> T generateRandomEnum(Class<T> enumClass) { + int x = RANDOM.nextInt(enumClass.getEnumConstants().length); + T data = enumClass.getEnumConstants()[x]; + return data; + } + + @SuppressWarnings("unchecked") + private <T extends BaseRecord> T generateFakeRecord(Class<T> recordClass) + throws IllegalArgumentException, IllegalAccessException, IOException { + + if (recordClass == MembershipState.class) { + return (T) MembershipState.newInstance(generateRandomString(), + generateRandomString(), generateRandomString(), + generateRandomString(), generateRandomString(), + generateRandomString(), generateRandomString(), + generateRandomString(), generateRandomString(), + generateRandomEnum(FederationNamenodeServiceState.class), false); + } else if (recordClass == MountTable.class) { + String src = "/" + generateRandomString(); + Map<String, String> destMap = Collections.singletonMap( + generateRandomString(), "/" + generateRandomString()); + return (T) MountTable.newInstance(src, destMap); + } else if (recordClass == RouterState.class) { + RouterState routerState = RouterState.newInstance(generateRandomString(), + generateRandomLong(), generateRandomEnum(RouterServiceState.class)); + StateStoreVersion version = generateFakeRecord(StateStoreVersion.class); + routerState.setStateStoreVersion(version); + return (T) routerState; + } + + return null; + } + + /** + * Validate if a record is the same. + * + * @param original Original record. + * @param committed Committed record. + * @param assertEquals Assert if the records are equal or just return. + * @return If the record is successfully validated. + */ + private boolean validateRecord( + BaseRecord original, BaseRecord committed, boolean assertEquals) { + + boolean ret = true; + + Map<String, Class<?>> fields = getFields(original); + for (String key : fields.keySet()) { + if (key.equals("dateModified") || + key.equals("dateCreated") || + key.equals("proto")) { + // Fields are updated/set on commit and fetch and may not match + // the fields that are initialized in a non-committed object. + continue; + } + Object data1 = getField(original, key); + Object data2 = getField(committed, key); + if (assertEquals) { + assertEquals("Field " + key + " does not match", data1, data2); + } else if (!data1.equals(data2)) { + ret = false; + } + } + + long now = stateStore.getDriver().getTime(); + assertTrue( + committed.getDateCreated() <= now && committed.getDateCreated() > 0); + assertTrue(committed.getDateModified() >= committed.getDateCreated()); + + return ret; + } + + public static void removeAll(StateStoreDriver driver) throws IOException { + driver.removeAll(MembershipState.class); + driver.removeAll(MountTable.class); + } + + public <T extends BaseRecord> void testInsert( + StateStoreDriver driver, Class<T> recordClass) + throws IllegalArgumentException, IllegalAccessException, IOException { + + assertTrue(driver.removeAll(recordClass)); + QueryResult<T> queryResult0 = driver.get(recordClass); + List<T> records0 = queryResult0.getRecords(); + assertTrue(records0.isEmpty()); + + // Insert single + BaseRecord record = generateFakeRecord(recordClass); + driver.put(record, true, false); + + // Verify + QueryResult<T> queryResult1 = driver.get(recordClass); + List<T> records1 = queryResult1.getRecords(); + assertEquals(1, records1.size()); + T record0 = records1.get(0); + validateRecord(record, record0, true); + + // Insert multiple + List<T> insertList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + T newRecord = generateFakeRecord(recordClass); + insertList.add(newRecord); + } + driver.putAll(insertList, true, false); + + // Verify + QueryResult<T> queryResult2 = driver.get(recordClass); + List<T> records2 = queryResult2.getRecords(); + assertEquals(11, records2.size()); + } + + public <T extends BaseRecord> void testFetchErrors(StateStoreDriver driver, + Class<T> clazz) throws IllegalAccessException, IOException { + + // Fetch empty list + driver.removeAll(clazz); + QueryResult<T> result0 = driver.get(clazz); + assertNotNull(result0); + List<T> records0 = result0.getRecords(); + assertEquals(records0.size(), 0); + + // Insert single + BaseRecord record = generateFakeRecord(clazz); + assertTrue(driver.put(record, true, false)); + + // Verify + QueryResult<T> result1 = driver.get(clazz); + List<T> records1 = result1.getRecords(); + assertEquals(1, records1.size()); + validateRecord(record, records1.get(0), true); + + // Test fetch single object with a bad query + final T fakeRecord = generateFakeRecord(clazz); + final Query<T> query = new Query<T>(fakeRecord); + T getRecord = driver.get(clazz, query); + assertNull(getRecord); + + // Test fetch multiple objects does not exist returns empty list + assertEquals(driver.getMultiple(clazz, query).size(), 0); + } + + public <T extends BaseRecord> void testPut( + StateStoreDriver driver, Class<T> clazz) + throws IllegalArgumentException, ReflectiveOperationException, + IOException, SecurityException { + + driver.removeAll(clazz); + QueryResult<T> records = driver.get(clazz); + assertTrue(records.getRecords().isEmpty()); + + // Insert multiple + List<T> insertList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + T newRecord = generateFakeRecord(clazz); + insertList.add(newRecord); + } + + // Verify + assertTrue(driver.putAll(insertList, false, true)); + records = driver.get(clazz); + assertEquals(records.getRecords().size(), 10); + + // Generate a new record with the same PK fields as an existing record + BaseRecord updatedRecord = generateFakeRecord(clazz); + BaseRecord existingRecord = records.getRecords().get(0); + Map<String, String> primaryKeys = existingRecord.getPrimaryKeys(); + for (Entry<String, String> entry : primaryKeys.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + Class<?> fieldType = getFieldType(existingRecord, key); + Object field = fromString(value, fieldType); + assertTrue(setField(updatedRecord, key, field)); + } + + // Attempt an update of an existing entry, but it is not allowed. + assertFalse(driver.put(updatedRecord, false, true)); + + // Verify no update occurred, all original records are unchanged + QueryResult<T> newRecords = driver.get(clazz); + assertTrue(newRecords.getRecords().size() == 10); + assertEquals("A single entry was improperly updated in the store", 10, + countMatchingEntries(records.getRecords(), newRecords.getRecords())); + + // Update the entry (allowing updates) + assertTrue(driver.put(updatedRecord, true, false)); + + // Verify that one entry no longer matches the original set + newRecords = driver.get(clazz); + assertEquals(10, newRecords.getRecords().size()); + assertEquals( + "Record of type " + clazz + " not updated in the store", 9, + countMatchingEntries(records.getRecords(), newRecords.getRecords())); + } + + private int countMatchingEntries( + Collection<? extends BaseRecord> committedList, + Collection<? extends BaseRecord> matchList) { + + int matchingCount = 0; + for (BaseRecord committed : committedList) { + for (BaseRecord match : matchList) { + try { + if (match.getPrimaryKey().equals(committed.getPrimaryKey())) { + if (validateRecord(match, committed, false)) { + matchingCount++; + } + break; + } + } catch (Exception ex) { + } + } + } + return matchingCount; + } + + public <T extends BaseRecord> void testRemove( + StateStoreDriver driver, Class<T> clazz) + throws IllegalArgumentException, IllegalAccessException, IOException { + + // Remove all + assertTrue(driver.removeAll(clazz)); + QueryResult<T> records = driver.get(clazz); + assertTrue(records.getRecords().isEmpty()); + + // Insert multiple + List<T> insertList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + T newRecord = generateFakeRecord(clazz); + insertList.add(newRecord); + } + + // Verify + assertTrue(driver.putAll(insertList, false, true)); + records = driver.get(clazz); + assertEquals(records.getRecords().size(), 10); + + // Remove Single + assertTrue(driver.remove(records.getRecords().get(0))); + + // Verify + records = driver.get(clazz); + assertEquals(records.getRecords().size(), 9); + + // Remove with filter + final T firstRecord = records.getRecords().get(0); + final Query<T> query0 = new Query<T>(firstRecord); + assertTrue(driver.remove(clazz, query0) > 0); + + final T secondRecord = records.getRecords().get(1); + final Query<T> query1 = new Query<T>(secondRecord); + assertTrue(driver.remove(clazz, query1) > 0); + + // Verify + records = driver.get(clazz); + assertEquals(records.getRecords().size(), 7); + + // Remove all + assertTrue(driver.removeAll(clazz)); + + // Verify + records = driver.get(clazz); + assertTrue(records.getRecords().isEmpty()); + } + + public void testInsert(StateStoreDriver driver) + throws IllegalArgumentException, IllegalAccessException, IOException { + testInsert(driver, MembershipState.class); + testInsert(driver, MountTable.class); + } + + public void testPut(StateStoreDriver driver) + throws IllegalArgumentException, ReflectiveOperationException, + IOException, SecurityException { + testPut(driver, MembershipState.class); + testPut(driver, MountTable.class); + } + + public void testRemove(StateStoreDriver driver) + throws IllegalArgumentException, IllegalAccessException, IOException { + testRemove(driver, MembershipState.class); + testRemove(driver, MountTable.class); + } + + public void testFetchErrors(StateStoreDriver driver) + throws IllegalArgumentException, IllegalAccessException, IOException { + testFetchErrors(driver, MembershipState.class); + testFetchErrors(driver, MountTable.class); + } + + public void testMetrics(StateStoreDriver driver) + throws IOException, IllegalArgumentException, IllegalAccessException { + + MountTable insertRecord = + this.generateFakeRecord(MountTable.class); + + // Put single + StateStoreMetrics metrics = stateStore.getMetrics(); + assertEquals(0, metrics.getWriteOps()); + driver.put(insertRecord, true, false); + assertEquals(1, metrics.getWriteOps()); + + // Put multiple + metrics.reset(); + assertEquals(0, metrics.getWriteOps()); + driver.put(insertRecord, true, false); + assertEquals(1, metrics.getWriteOps()); + + // Get Single + metrics.reset(); + assertEquals(0, metrics.getReadOps()); + + final String querySourcePath = insertRecord.getSourcePath(); + MountTable partial = MountTable.newInstance(); + partial.setSourcePath(querySourcePath); + final Query<MountTable> query = new Query<>(partial); + driver.get(MountTable.class, query); + assertEquals(1, metrics.getReadOps()); + + // GetAll + metrics.reset(); + assertEquals(0, metrics.getReadOps()); + driver.get(MountTable.class); + assertEquals(1, metrics.getReadOps()); + + // GetMultiple + metrics.reset(); + assertEquals(0, metrics.getReadOps()); + driver.getMultiple(MountTable.class, query); + assertEquals(1, metrics.getReadOps()); + + // Insert fails + metrics.reset(); + assertEquals(0, metrics.getFailureOps()); + driver.put(insertRecord, false, true); + assertEquals(1, metrics.getFailureOps()); + + // Remove single + metrics.reset(); + assertEquals(0, metrics.getRemoveOps()); + driver.remove(insertRecord); + assertEquals(1, metrics.getRemoveOps()); + + // Remove multiple + metrics.reset(); + driver.put(insertRecord, true, false); + assertEquals(0, metrics.getRemoveOps()); + driver.remove(MountTable.class, query); + assertEquals(1, metrics.getRemoveOps()); + + // Remove all + metrics.reset(); + driver.put(insertRecord, true, false); + assertEquals(0, metrics.getRemoveOps()); + driver.removeAll(MountTable.class); + assertEquals(1, metrics.getRemoveOps()); + } + + /** + * Sets the value of a field on the object. + * + * @param fieldName The string name of the field. + * @param data The data to pass to the field's setter. + * + * @return True if successful, fails if failed. + */ + private static boolean setField( + BaseRecord record, String fieldName, Object data) { + + Method m = locateSetter(record, fieldName); + if (m != null) { + try { + m.invoke(record, data); + } catch (Exception e) { + LOG.error("Cannot set field " + fieldName + " on object " + + record.getClass().getName() + " to data " + data + " of type " + + data.getClass(), e); + return false; + } + } + return true; + } + + /** + * Finds the appropriate setter for a field name. + * + * @param fieldName The legacy name of the field. + * @return The matching setter or null if not found. + */ + private static Method locateSetter(BaseRecord record, String fieldName) { + for (Method m : record.getClass().getMethods()) { + if (m.getName().equalsIgnoreCase("set" + fieldName)) { + return m; + } + } + return null; + } + + /** + * Returns all serializable fields in the object. + * + * @return Map with the fields. + */ + private static Map<String, Class<?>> getFields(BaseRecord record) { + Map<String, Class<?>> getters = new HashMap<>(); + for (Method m : record.getClass().getDeclaredMethods()) { + if (m.getName().startsWith("get")) { + try { + Class<?> type = m.getReturnType(); + char[] c = m.getName().substring(3).toCharArray(); + c[0] = Character.toLowerCase(c[0]); + String key = new String(c); + getters.put(key, type); + } catch (Exception e) { + LOG.error("Cannot execute getter " + m.getName() + + " on object " + record); + } + } + } + return getters; + } + + /** + * Get the type of a field. + * + * @param fieldName + * @return Field type + */ + private static Class<?> getFieldType(BaseRecord record, String fieldName) { + Method m = locateGetter(record, fieldName); + return m.getReturnType(); + } + + /** + * Fetches the value for a field name. + * + * @param fieldName the legacy name of the field. + * @return The field data or null if not found. + */ + private static Object getField(BaseRecord record, String fieldName) { + Object result = null; + Method m = locateGetter(record, fieldName); + if (m != null) { + try { + result = m.invoke(record); + } catch (Exception e) { + LOG.error("Cannot get field " + fieldName + " on object " + record); + } + } + return result; + } + + /** + * Finds the appropriate getter for a field name. + * + * @param fieldName The legacy name of the field. + * @return The matching getter or null if not found. + */ + private static Method locateGetter(BaseRecord record, String fieldName) { + for (Method m : record.getClass().getMethods()) { + if (m.getName().equalsIgnoreCase("get" + fieldName)) { + return m; + } + } + return null; + } + + /** + * Expands a data object from the store into an record object. Default store + * data type is a String. Override if additional serialization is required. + * + * @param data Object containing the serialized data. Only string is + * supported. + * @param clazz Target object class to hold the deserialized data. + * @return An instance of the target data object initialized with the + * deserialized data. + */ + @Deprecated + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static <T> T fromString(String data, Class<T> clazz) { + + if (data.equals("null")) { + return null; + } else if (clazz == String.class) { + return (T) data; + } else if (clazz == Long.class || clazz == long.class) { + return (T) Long.valueOf(data); + } else if (clazz == Integer.class || clazz == int.class) { + return (T) Integer.valueOf(data); + } else if (clazz == Double.class || clazz == double.class) { + return (T) Double.valueOf(data); + } else if (clazz == Float.class || clazz == float.class) { + return (T) Float.valueOf(data); + } else if (clazz == Boolean.class || clazz == boolean.class) { + return (T) Boolean.valueOf(data); + } else if (clazz.isEnum()) { + return (T) Enum.valueOf((Class<Enum>) clazz, data); + } + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java new file mode 100644 index 0000000..a8a9020 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the FileSystem (e.g., HDFS) implementation of the State Store driver. + */ +public class TestStateStoreFile extends TestStateStoreDriverBase { + + @BeforeClass + public static void setupCluster() throws Exception { + Configuration conf = getStateStoreConfiguration(StateStoreFileImpl.class); + getStateStore(conf); + } + + @Before + public void startup() throws IOException { + removeAll(getStateStoreDriver()); + } + + @Test + public void testInsert() + throws IllegalArgumentException, IllegalAccessException, IOException { + testInsert(getStateStoreDriver()); + } + + @Test + public void testUpdate() + throws IllegalArgumentException, ReflectiveOperationException, + IOException, SecurityException { + testPut(getStateStoreDriver()); + } + + @Test + public void testDelete() + throws IllegalArgumentException, IllegalAccessException, IOException { + testRemove(getStateStoreDriver()); + } + + @Test + public void testFetchErrors() + throws IllegalArgumentException, IllegalAccessException, IOException { + testFetchErrors(getStateStoreDriver()); + } + + @Test + public void testMetrics() + throws IllegalArgumentException, IllegalAccessException, IOException { + testMetrics(getStateStoreDriver()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileBase.java new file mode 100644 index 0000000..9adfe33 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileBase.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl.isOldTempRecord; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.util.Time; +import org.junit.Test; + +/** + * Tests for the State Store file based implementation. + */ +public class TestStateStoreFileBase { + + @Test + public void testTempOld() { + assertFalse(isOldTempRecord("test.txt")); + assertFalse(isOldTempRecord("testfolder/test.txt")); + + long tnow = Time.now(); + String tmpFile1 = "test." + tnow + ".tmp"; + assertFalse(isOldTempRecord(tmpFile1)); + + long told = Time.now() - TimeUnit.MINUTES.toMillis(1); + String tmpFile2 = "test." + told + ".tmp"; + assertTrue(isOldTempRecord(tmpFile2)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java new file mode 100644 index 0000000..8c4b188 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the FileSystem (e.g., HDFS) implementation of the State Store driver. + */ +public class TestStateStoreFileSystem extends TestStateStoreDriverBase { + + private static MiniDFSCluster dfsCluster; + + @BeforeClass + public static void setupCluster() throws Exception { + Configuration conf = FederationStateStoreTestUtils + .getStateStoreConfiguration(StateStoreFileSystemImpl.class); + conf.set(StateStoreFileSystemImpl.FEDERATION_STORE_FS_PATH, + "/hdfs-federation/"); + + // Create HDFS cluster to back the state tore + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + builder.numDataNodes(1); + dfsCluster = builder.build(); + dfsCluster.waitClusterUp(); + getStateStore(conf); + } + + @AfterClass + public static void tearDownCluster() { + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + } + + @Before + public void startup() throws IOException { + removeAll(getStateStoreDriver()); + } + + @Test + public void testInsert() + throws IllegalArgumentException, IllegalAccessException, IOException { + testInsert(getStateStoreDriver()); + } + + @Test + public void testUpdate() throws IllegalArgumentException, IOException, + SecurityException, ReflectiveOperationException { + testPut(getStateStoreDriver()); + } + + @Test + public void testDelete() + throws IllegalArgumentException, IllegalAccessException, IOException { + testRemove(getStateStoreDriver()); + } + + @Test + public void testFetchErrors() + throws IllegalArgumentException, IllegalAccessException, IOException { + testFetchErrors(getStateStoreDriver()); + } + + @Test + public void testMetrics() + throws IllegalArgumentException, IllegalAccessException, IOException { + testMetrics(getStateStoreDriver()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java new file mode 100644 index 0000000..3cf7c91 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreZK.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the ZooKeeper implementation of the State Store driver. + */ +public class TestStateStoreZK extends TestStateStoreDriverBase { + + private static TestingServer curatorTestingServer; + private static CuratorFramework curatorFramework; + + @BeforeClass + public static void setupCluster() throws Exception { + curatorTestingServer = new TestingServer(); + curatorTestingServer.start(); + String connectString = curatorTestingServer.getConnectString(); + curatorFramework = CuratorFrameworkFactory.builder() + .connectString(connectString) + .retryPolicy(new RetryNTimes(100, 100)) + .build(); + curatorFramework.start(); + + // Create the ZK State Store + Configuration conf = + getStateStoreConfiguration(StateStoreZooKeeperImpl.class); + conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString); + // Disable auto-repair of connection + conf.setLong(RBFConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS, + TimeUnit.HOURS.toMillis(1)); + getStateStore(conf); + } + + @AfterClass + public static void tearDownCluster() { + curatorFramework.close(); + try { + curatorTestingServer.stop(); + } catch (IOException e) { + } + } + + @Before + public void startup() throws IOException { + removeAll(getStateStoreDriver()); + } + + @Test + public void testInsert() + throws IllegalArgumentException, IllegalAccessException, IOException { + testInsert(getStateStoreDriver()); + } + + @Test + public void testUpdate() + throws IllegalArgumentException, ReflectiveOperationException, + IOException, SecurityException { + testPut(getStateStoreDriver()); + } + + @Test + public void testDelete() + throws IllegalArgumentException, IllegalAccessException, IOException { + testRemove(getStateStoreDriver()); + } + + @Test + public void testFetchErrors() + throws IllegalArgumentException, IllegalAccessException, IOException { + testFetchErrors(getStateStoreDriver()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMembershipState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMembershipState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMembershipState.java new file mode 100644 index 0000000..d922414 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMembershipState.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.junit.Test; + +/** + * Test the Membership State records. + */ +public class TestMembershipState { + + private static final String ROUTER = "router"; + private static final String NAMESERVICE = "nameservice"; + private static final String NAMENODE = "namenode"; + private static final String CLUSTER_ID = "cluster"; + private static final String BLOCKPOOL_ID = "blockpool"; + private static final String RPC_ADDRESS = "rpcaddress"; + private static final String SERVICE_ADDRESS = "serviceaddress"; + private static final String LIFELINE_ADDRESS = "lifelineaddress"; + private static final String WEB_ADDRESS = "webaddress"; + private static final boolean SAFE_MODE = false; + + private static final long DATE_CREATED = 100; + private static final long DATE_MODIFIED = 200; + + private static final long NUM_BLOCKS = 300; + private static final long NUM_FILES = 400; + private static final int NUM_DEAD = 500; + private static final int NUM_ACTIVE = 600; + private static final int NUM_DECOM = 700; + private static final int NUM_DECOM_ACTIVE = 800; + private static final int NUM_DECOM_DEAD = 900; + private static final long NUM_BLOCK_MISSING = 1000; + + private static final long TOTAL_SPACE = 1100; + private static final long AVAILABLE_SPACE = 1200; + + private static final FederationNamenodeServiceState STATE = + FederationNamenodeServiceState.ACTIVE; + + private MembershipState createRecord() throws IOException { + + MembershipState record = MembershipState.newInstance( + ROUTER, NAMESERVICE, NAMENODE, CLUSTER_ID, + BLOCKPOOL_ID, RPC_ADDRESS, SERVICE_ADDRESS, LIFELINE_ADDRESS, + WEB_ADDRESS, STATE, SAFE_MODE); + record.setDateCreated(DATE_CREATED); + record.setDateModified(DATE_MODIFIED); + + MembershipStats stats = MembershipStats.newInstance(); + stats.setNumOfBlocks(NUM_BLOCKS); + stats.setNumOfFiles(NUM_FILES); + stats.setNumOfActiveDatanodes(NUM_ACTIVE); + stats.setNumOfDeadDatanodes(NUM_DEAD); + stats.setNumOfDecommissioningDatanodes(NUM_DECOM); + stats.setNumOfDecomActiveDatanodes(NUM_DECOM_ACTIVE); + stats.setNumOfDecomDeadDatanodes(NUM_DECOM_DEAD); + stats.setNumOfBlocksMissing(NUM_BLOCK_MISSING); + stats.setTotalSpace(TOTAL_SPACE); + stats.setAvailableSpace(AVAILABLE_SPACE); + record.setStats(stats); + return record; + } + + private void validateRecord(MembershipState record) throws IOException { + + assertEquals(ROUTER, record.getRouterId()); + assertEquals(NAMESERVICE, record.getNameserviceId()); + assertEquals(CLUSTER_ID, record.getClusterId()); + assertEquals(BLOCKPOOL_ID, record.getBlockPoolId()); + assertEquals(RPC_ADDRESS, record.getRpcAddress()); + assertEquals(WEB_ADDRESS, record.getWebAddress()); + assertEquals(STATE, record.getState()); + assertEquals(SAFE_MODE, record.getIsSafeMode()); + assertEquals(DATE_CREATED, record.getDateCreated()); + assertEquals(DATE_MODIFIED, record.getDateModified()); + + MembershipStats stats = record.getStats(); + assertEquals(NUM_BLOCKS, stats.getNumOfBlocks()); + assertEquals(NUM_FILES, stats.getNumOfFiles()); + assertEquals(NUM_ACTIVE, stats.getNumOfActiveDatanodes()); + assertEquals(NUM_DEAD, stats.getNumOfDeadDatanodes()); + assertEquals(NUM_DECOM, stats.getNumOfDecommissioningDatanodes()); + assertEquals(NUM_DECOM_ACTIVE, stats.getNumOfDecomActiveDatanodes()); + assertEquals(NUM_DECOM_DEAD, stats.getNumOfDecomDeadDatanodes()); + assertEquals(TOTAL_SPACE, stats.getTotalSpace()); + assertEquals(AVAILABLE_SPACE, stats.getAvailableSpace()); + } + + @Test + public void testGetterSetter() throws IOException { + MembershipState record = createRecord(); + validateRecord(record); + } + + @Test + public void testSerialization() throws IOException { + + MembershipState record = createRecord(); + + StateStoreSerializer serializer = StateStoreSerializer.getSerializer(); + String serializedString = serializer.serializeString(record); + MembershipState newRecord = + serializer.deserialize(serializedString, MembershipState.class); + + validateRecord(newRecord); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org