Repository: hive Updated Branches: refs/heads/master 79e88695c -> 26c0ab6ad
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java index 0006815..a72fc0b 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java @@ -22,7 +22,10 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; - +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; @@ -66,19 +69,13 @@ public class TestCachedStore { objectStore = new ObjectStore(); objectStore.setConf(conf); cachedStore = new CachedStore(); - cachedStore.setConf(conf); - // Stop the CachedStore cache update service. We'll start it explicitly to control the test - CachedStore.stopCacheUpdateService(1); - cachedStore.setInitializedForTest(); - + cachedStore.setConfForTest(conf); // Stop the CachedStore cache update service. We'll start it explicitly to control the test CachedStore.stopCacheUpdateService(1); sharedCache = new SharedCache(); sharedCache.getDatabaseCache().clear(); sharedCache.getTableCache().clear(); - sharedCache.getPartitionCache().clear(); sharedCache.getSdCache().clear(); - sharedCache.getPartitionColStatsCache().clear(); } /********************************************************************************************** @@ -89,61 +86,49 @@ public class TestCachedStore { public void testDatabaseOps() throws Exception { // Add a db via ObjectStore String dbName = "testDatabaseOps"; - String dbDescription = "testDatabaseOps"; - String dbLocation = "file:/tmp"; - Map<String, String> dbParams = new HashMap<>(); String dbOwner = "user1"; - Database db = new Database(dbName, dbDescription, dbLocation, dbParams); - db.setOwnerName(dbOwner); - db.setOwnerType(PrincipalType.USER); + Database db = createTestDb(dbName, dbOwner); objectStore.createDatabase(db); db = objectStore.getDatabase(dbName); // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); CachedStore.prewarm(objectStore); // Read database via CachedStore - Database dbNew = cachedStore.getDatabase(dbName); - Assert.assertEquals(db, dbNew); + Database dbRead = cachedStore.getDatabase(dbName); + Assert.assertEquals(db, dbRead); // Add another db via CachedStore final String dbName1 = "testDatabaseOps1"; - final String dbDescription1 = "testDatabaseOps1"; - Database db1 = new Database(dbName1, dbDescription1, dbLocation, dbParams); - db1.setOwnerName(dbOwner); - db1.setOwnerType(PrincipalType.USER); + Database db1 = createTestDb(dbName1, dbOwner); cachedStore.createDatabase(db1); db1 = cachedStore.getDatabase(dbName1); // Read db via ObjectStore - dbNew = objectStore.getDatabase(dbName1); - Assert.assertEquals(db1, dbNew); + dbRead = objectStore.getDatabase(dbName1); + Assert.assertEquals(db1, dbRead); // Alter the db via CachedStore (can only alter owner or parameters) - db = new Database(dbName, dbDescription, dbLocation, dbParams); dbOwner = "user2"; + db = new Database(db); db.setOwnerName(dbOwner); - db.setOwnerType(PrincipalType.USER); cachedStore.alterDatabase(dbName, db); db = cachedStore.getDatabase(dbName); // Read db via ObjectStore - dbNew = objectStore.getDatabase(dbName); - Assert.assertEquals(db, dbNew); + dbRead = objectStore.getDatabase(dbName); + Assert.assertEquals(db, dbRead); // Add another db via ObjectStore final String dbName2 = "testDatabaseOps2"; - final String dbDescription2 = "testDatabaseOps2"; - Database db2 = new Database(dbName2, dbDescription2, dbLocation, dbParams); - db2.setOwnerName(dbOwner); - db2.setOwnerType(PrincipalType.USER); + Database db2 = createTestDb(dbName2, dbOwner); objectStore.createDatabase(db2); db2 = objectStore.getDatabase(dbName2); // Alter db "testDatabaseOps" via ObjectStore dbOwner = "user1"; - db = new Database(dbName, dbDescription, dbLocation, dbParams); + db = new Database(db); db.setOwnerName(dbOwner); - db.setOwnerType(PrincipalType.USER); objectStore.alterDatabase(dbName, db); db = objectStore.getDatabase(dbName); @@ -151,20 +136,20 @@ public class TestCachedStore { objectStore.dropDatabase(dbName1); // We update twice to accurately detect if cache is dirty or not - updateCache(cachedStore, 100, 500, 100); - updateCache(cachedStore, 100, 500, 100); + updateCache(cachedStore); + updateCache(cachedStore); // Read the newly added db via CachedStore - dbNew = cachedStore.getDatabase(dbName2); - Assert.assertEquals(db2, dbNew); + dbRead = cachedStore.getDatabase(dbName2); + Assert.assertEquals(db2, dbRead); // Read the altered db via CachedStore (altered user from "user2" to "user1") - dbNew = cachedStore.getDatabase(dbName); - Assert.assertEquals(db, dbNew); + dbRead = cachedStore.getDatabase(dbName); + Assert.assertEquals(db, dbRead); // Try to read the dropped db after cache update try { - dbNew = cachedStore.getDatabase(dbName1); + dbRead = cachedStore.getDatabase(dbName1); Assert.fail("The database: " + dbName1 + " should have been removed from the cache after running the update service"); } catch (NoSuchObjectException e) { @@ -174,78 +159,64 @@ public class TestCachedStore { // Clean up objectStore.dropDatabase(dbName); objectStore.dropDatabase(dbName2); + sharedCache.getDatabaseCache().clear(); + sharedCache.getTableCache().clear(); + sharedCache.getSdCache().clear(); } @Test public void testTableOps() throws Exception { // Add a db via ObjectStore String dbName = "testTableOps"; - String dbDescription = "testTableOps"; - String dbLocation = "file:/tmp"; - Map<String, String> dbParams = new HashMap<>(); String dbOwner = "user1"; - Database db = new Database(dbName, dbDescription, dbLocation, dbParams); - db.setOwnerName(dbOwner); - db.setOwnerType(PrincipalType.USER); + Database db = createTestDb(dbName, dbOwner); objectStore.createDatabase(db); db = objectStore.getDatabase(dbName); // Add a table via ObjectStore String tblName = "tbl"; String tblOwner = "user1"; - String serdeLocation = "file:/tmp"; FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); FieldSchema col2 = new FieldSchema("col2", "string", "string column"); - List<FieldSchema> cols = new ArrayList<>(); + List<FieldSchema> cols = new ArrayList<FieldSchema>(); cols.add(col1); cols.add(col2); - Map<String, String> serdeParams = new HashMap<>(); - Map<String, String> tblParams = new HashMap<>(); - SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<>()); - StorageDescriptor sd = - new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, - null, serdeParams); - sd.setStoredAsSubDirectories(false); - Table tbl = - new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams, - null, null, TableType.MANAGED_TABLE.toString()); + List<FieldSchema> ptnCols = new ArrayList<FieldSchema>(); + Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols); objectStore.createTable(tbl); tbl = objectStore.getTable(dbName, tblName); // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); CachedStore.prewarm(objectStore); // Read database, table via CachedStore - Database dbNew = cachedStore.getDatabase(dbName); - Assert.assertEquals(db, dbNew); - Table tblNew = cachedStore.getTable(dbName, tblName); - Assert.assertEquals(tbl, tblNew); + Database dbRead= cachedStore.getDatabase(dbName); + Assert.assertEquals(db, dbRead); + Table tblRead = cachedStore.getTable(dbName, tblName); + Assert.assertEquals(tbl, tblRead); // Add a new table via CachedStore String tblName1 = "tbl1"; - Table tbl1 = - new Table(tblName1, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams, - null, null, TableType.MANAGED_TABLE.toString()); + Table tbl1 = new Table(tbl); + tbl1.setTableName(tblName1); cachedStore.createTable(tbl1); tbl1 = cachedStore.getTable(dbName, tblName1); // Read via object store - tblNew = objectStore.getTable(dbName, tblName1); - Assert.assertEquals(tbl1, tblNew); + tblRead = objectStore.getTable(dbName, tblName1); + Assert.assertEquals(tbl1, tblRead); // Add a new table via ObjectStore String tblName2 = "tbl2"; - Table tbl2 = - new Table(tblName2, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams, - null, null, TableType.MANAGED_TABLE.toString()); + Table tbl2 = new Table(tbl); + tbl2.setTableName(tblName2); objectStore.createTable(tbl2); tbl2 = objectStore.getTable(dbName, tblName2); // Alter table "tbl" via ObjectStore tblOwner = "user2"; - tbl = - new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams, - null, null, TableType.MANAGED_TABLE.toString()); + tbl.setOwner(tblOwner); objectStore.alterTable(dbName, tblName, tbl); tbl = objectStore.getTable(dbName, tblName); @@ -253,20 +224,20 @@ public class TestCachedStore { objectStore.dropTable(dbName, tblName1); // We update twice to accurately detect if cache is dirty or not - updateCache(cachedStore, 100, 500, 100); - updateCache(cachedStore, 100, 500, 100); + updateCache(cachedStore); + updateCache(cachedStore); // Read "tbl2" via CachedStore - tblNew = cachedStore.getTable(dbName, tblName2); - Assert.assertEquals(tbl2, tblNew); + tblRead = cachedStore.getTable(dbName, tblName2); + Assert.assertEquals(tbl2, tblRead); // Read the altered "tbl" via CachedStore - tblNew = cachedStore.getTable(dbName, tblName); - Assert.assertEquals(tbl, tblNew); + tblRead = cachedStore.getTable(dbName, tblName); + Assert.assertEquals(tbl, tblRead); // Try to read the dropped "tbl1" via CachedStore (should throw exception) - tblNew = cachedStore.getTable(dbName, tblName1); - Assert.assertNull(tblNew); + tblRead = cachedStore.getTable(dbName, tblName1); + Assert.assertNull(tblRead); // Should return "tbl" and "tbl2" List<String> tblNames = cachedStore.getTables(dbName, "*"); @@ -278,81 +249,72 @@ public class TestCachedStore { objectStore.dropTable(dbName, tblName); objectStore.dropTable(dbName, tblName2); objectStore.dropDatabase(dbName); + sharedCache.getDatabaseCache().clear(); + sharedCache.getTableCache().clear(); + sharedCache.getSdCache().clear(); } @Test public void testPartitionOps() throws Exception { // Add a db via ObjectStore String dbName = "testPartitionOps"; - String dbDescription = "testPartitionOps"; - String dbLocation = "file:/tmp"; - Map<String, String> dbParams = new HashMap<>(); String dbOwner = "user1"; - Database db = new Database(dbName, dbDescription, dbLocation, dbParams); - db.setOwnerName(dbOwner); - db.setOwnerType(PrincipalType.USER); + Database db = createTestDb(dbName, dbOwner); objectStore.createDatabase(db); db = objectStore.getDatabase(dbName); // Add a table via ObjectStore String tblName = "tbl"; String tblOwner = "user1"; - String serdeLocation = "file:/tmp"; FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); FieldSchema col2 = new FieldSchema("col2", "string", "string column"); - List<FieldSchema> cols = new ArrayList<>(); + List<FieldSchema> cols = new ArrayList<FieldSchema>(); cols.add(col1); cols.add(col2); - Map<String, String> serdeParams = new HashMap<>(); - Map<String, String> tblParams = new HashMap<>(); - SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = - new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, - null, serdeParams); FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column"); - List<FieldSchema> ptnCols = new ArrayList<>(); + List<FieldSchema> ptnCols = new ArrayList<FieldSchema>(); ptnCols.add(ptnCol1); - Table tbl = - new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null, - TableType.MANAGED_TABLE.toString()); + Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols); objectStore.createTable(tbl); tbl = objectStore.getTable(dbName, tblName); + final String ptnColVal1 = "aaa"; - Map<String, String> partParams = new HashMap<>(); + Map<String, String> partParams = new HashMap<String, String>(); Partition ptn1 = - new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, sd, partParams); + new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, tbl.getSd(), partParams); objectStore.addPartition(ptn1); ptn1 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1)); final String ptnColVal2 = "bbb"; Partition ptn2 = - new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, sd, partParams); + new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, tbl.getSd(), partParams); objectStore.addPartition(ptn2); ptn2 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2)); // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); CachedStore.prewarm(objectStore); // Read database, table, partition via CachedStore - Database dbNew = cachedStore.getDatabase(dbName); - Assert.assertEquals(db, dbNew); - Table tblNew = cachedStore.getTable(dbName, tblName); - Assert.assertEquals(tbl, tblNew); - Partition newPtn1 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1)); - Assert.assertEquals(ptn1, newPtn1); - Partition newPtn2 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2)); - Assert.assertEquals(ptn2, newPtn2); + Database dbRead = cachedStore.getDatabase(dbName); + Assert.assertEquals(db, dbRead); + Table tblRead = cachedStore.getTable(dbName, tblName); + Assert.assertEquals(tbl, tblRead); + Partition ptn1Read = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1)); + Assert.assertEquals(ptn1, ptn1Read); + Partition ptn2Read = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2)); + Assert.assertEquals(ptn2, ptn2Read); // Add a new partition via ObjectStore final String ptnColVal3 = "ccc"; Partition ptn3 = - new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, sd, partParams); + new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, tbl.getSd(), partParams); objectStore.addPartition(ptn3); ptn3 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3)); // Alter an existing partition ("aaa") via ObjectStore final String ptnColVal1Alt = "aaaAlt"; Partition ptn1Atl = - new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, sd, partParams); + new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, tbl.getSd(), partParams); objectStore.alterPartition(dbName, tblName, Arrays.asList(ptnColVal1), ptn1Atl); ptn1Atl = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt)); @@ -360,45 +322,47 @@ public class TestCachedStore { objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal2)); // We update twice to accurately detect if cache is dirty or not - updateCache(cachedStore, 100, 500, 100); - updateCache(cachedStore, 100, 500, 100); + updateCache(cachedStore); + updateCache(cachedStore); // Read the newly added partition via CachedStore - Partition newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3)); - Assert.assertEquals(ptn3, newPtn); + Partition ptnRead = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3)); + Assert.assertEquals(ptn3, ptnRead); // Read the altered partition via CachedStore - newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt)); - Assert.assertEquals(ptn1Atl, newPtn); + ptnRead = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt)); + Assert.assertEquals(ptn1Atl, ptnRead); // Try to read the dropped partition via CachedStore try { - newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2)); + ptnRead = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2)); Assert.fail("The partition: " + ptnColVal2 + " should have been removed from the cache after running the update service"); } catch (NoSuchObjectException e) { // Expected } + // Clean up + objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt)); + objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal3)); + objectStore.dropTable(dbName, tblName); + objectStore.dropDatabase(dbName); + sharedCache.getDatabaseCache().clear(); + sharedCache.getTableCache().clear(); + sharedCache.getSdCache().clear(); } //@Test public void testTableColStatsOps() throws Exception { // Add a db via ObjectStore String dbName = "testTableColStatsOps"; - String dbDescription = "testTableColStatsOps"; - String dbLocation = "file:/tmp"; - Map<String, String> dbParams = new HashMap<>(); String dbOwner = "user1"; - Database db = new Database(dbName, dbDescription, dbLocation, dbParams); - db.setOwnerName(dbOwner); - db.setOwnerType(PrincipalType.USER); + Database db = createTestDb(dbName, dbOwner); objectStore.createDatabase(db); db = objectStore.getDatabase(dbName); // Add a table via ObjectStore final String tblName = "tbl"; final String tblOwner = "user1"; - final String serdeLocation = "file:/tmp"; final FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); // Stats values for col1 long col1LowVal = 5; @@ -420,15 +384,10 @@ public class TestCachedStore { cols.add(col1); cols.add(col2); cols.add(col3); - Map<String, String> serdeParams = new HashMap<>(); - Map<String, String> tblParams = new HashMap<>(); - final SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = - new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, - null, serdeParams); - Table tbl = - new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams, - null, null, TableType.MANAGED_TABLE.toString()); + FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column"); + List<FieldSchema> ptnCols = new ArrayList<FieldSchema>(); + ptnCols.add(ptnCol1); + Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols); objectStore.createTable(tbl); tbl = objectStore.getTable(dbName, tblName); @@ -476,6 +435,7 @@ public class TestCachedStore { objectStore.updateTableColumnStatistics(stats); // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); CachedStore.prewarm(objectStore); // Read table stats via CachedStore @@ -483,18 +443,13 @@ public class TestCachedStore { cachedStore.getTableColumnStatistics(dbName, tblName, Arrays.asList(col1.getName(), col2.getName(), col3.getName())); Assert.assertEquals(stats, newStats); - } - private void updateCache(CachedStore cachedStore, long frequency, long sleepTime, - long shutdownTimeout) throws InterruptedException { - // Set cache refresh period to 100 milliseconds - CachedStore.setCacheRefreshPeriod(100); - // Start the CachedStore update service - CachedStore.startCacheUpdateService(cachedStore.getConf()); - // Sleep for 500 ms so that cache update is complete - Thread.sleep(500); - // Stop cache update service - CachedStore.stopCacheUpdateService(100); + // Clean up + objectStore.dropTable(dbName, tblName); + objectStore.dropDatabase(dbName); + sharedCache.getDatabaseCache().clear(); + sharedCache.getTableCache().clear(); + sharedCache.getSdCache().clear(); } /********************************************************************************************** @@ -503,29 +458,21 @@ public class TestCachedStore { @Test public void testSharedStoreDb() { - Database db1 = new Database(); - Database db2 = new Database(); - Database db3 = new Database(); - Database newDb1 = new Database(); - newDb1.setName("db1"); - - sharedCache.addDatabaseToCache("db1", db1); - sharedCache.addDatabaseToCache("db2", db2); - sharedCache.addDatabaseToCache("db3", db3); - + Database db1 = createTestDb("db1", "user1"); + Database db2 = createTestDb("db2", "user1"); + Database db3 = createTestDb("db3", "user1"); + Database newDb1 = createTestDb("newdb1", "user1"); + sharedCache.addDatabaseToCache(db1); + sharedCache.addDatabaseToCache(db2); + sharedCache.addDatabaseToCache(db3); Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3); - sharedCache.alterDatabaseInCache("db1", newDb1); - Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3); - sharedCache.removeDatabaseFromCache("db2"); - Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 2); - List<String> dbs = sharedCache.listCachedDatabases(); Assert.assertEquals(dbs.size(), 2); - Assert.assertTrue(dbs.contains("db1")); + Assert.assertTrue(dbs.contains("newdb1")); Assert.assertTrue(dbs.contains("db3")); } @@ -608,6 +555,23 @@ public class TestCachedStore { @Test public void testSharedStorePartition() { + String dbName = "db1"; + String tbl1Name = "tbl1"; + String tbl2Name = "tbl2"; + String owner = "user1"; + Database db = createTestDb(dbName, owner); + sharedCache.addDatabaseToCache(db); + FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); + FieldSchema col2 = new FieldSchema("col2", "string", "string column"); + List<FieldSchema> cols = new ArrayList<FieldSchema>(); + cols.add(col1); + cols.add(col2); + List<FieldSchema> ptnCols = new ArrayList<FieldSchema>(); + Table tbl1 = createTestTbl(dbName, tbl1Name, owner, cols, ptnCols); + sharedCache.addTableToCache(dbName, tbl1Name, tbl1); + Table tbl2 = createTestTbl(dbName, tbl2Name, owner, cols, ptnCols); + sharedCache.addTableToCache(dbName, tbl2Name, tbl2); + Partition part1 = new Partition(); StorageDescriptor sd1 = new StorageDescriptor(); List<FieldSchema> cols1 = new ArrayList<>(); @@ -645,8 +609,8 @@ public class TestCachedStore { part3.setValues(Arrays.asList("201703")); Partition newPart1 = new Partition(); - newPart1.setDbName("db1"); - newPart1.setTableName("tbl1"); + newPart1.setDbName(dbName); + newPart1.setTableName(tbl1Name); StorageDescriptor newSd1 = new StorageDescriptor(); List<FieldSchema> newCols1 = new ArrayList<>(); newCols1.add(new FieldSchema("newcol1", "int", "")); @@ -654,32 +618,25 @@ public class TestCachedStore { newParams1.put("key", "value"); newSd1.setCols(newCols1); newSd1.setParameters(params1); - newSd1.setLocation("loc1"); + newSd1.setLocation("loc1new"); newPart1.setSd(newSd1); newPart1.setValues(Arrays.asList("201701")); - sharedCache.addPartitionToCache("db1", "tbl1", part1); - sharedCache.addPartitionToCache("db1", "tbl1", part2); - sharedCache.addPartitionToCache("db1", "tbl1", part3); - sharedCache.addPartitionToCache("db1", "tbl2", part1); - - Assert.assertEquals(sharedCache.getCachedPartitionCount(), 4); - Assert.assertEquals(sharedCache.getSdCache().size(), 2); + sharedCache.addPartitionToCache(dbName, tbl1Name, part1); + sharedCache.addPartitionToCache(dbName, tbl1Name, part2); + sharedCache.addPartitionToCache(dbName, tbl1Name, part3); + sharedCache.addPartitionToCache(dbName, tbl2Name, part1); - Partition t = sharedCache.getPartitionFromCache("db1", "tbl1", Arrays.asList("201701")); + Partition t = sharedCache.getPartitionFromCache(dbName, tbl1Name, Arrays.asList("201701")); Assert.assertEquals(t.getSd().getLocation(), "loc1"); - sharedCache.removePartitionFromCache("db1", "tbl2", Arrays.asList("201701")); - Assert.assertEquals(sharedCache.getCachedPartitionCount(), 3); - Assert.assertEquals(sharedCache.getSdCache().size(), 2); - - sharedCache.alterPartitionInCache("db1", "tbl1", Arrays.asList("201701"), newPart1); - Assert.assertEquals(sharedCache.getCachedPartitionCount(), 3); - Assert.assertEquals(sharedCache.getSdCache().size(), 3); + sharedCache.removePartitionFromCache(dbName, tbl2Name, Arrays.asList("201701")); + t = sharedCache.getPartitionFromCache(dbName, tbl2Name, Arrays.asList("201701")); + Assert.assertNull(t); - sharedCache.removePartitionFromCache("db1", "tbl1", Arrays.asList("201702")); - Assert.assertEquals(sharedCache.getCachedPartitionCount(), 2); - Assert.assertEquals(sharedCache.getSdCache().size(), 2); + sharedCache.alterPartitionInCache(dbName, tbl1Name, Arrays.asList("201701"), newPart1); + t = sharedCache.getPartitionFromCache(dbName, tbl1Name, Arrays.asList("201701")); + Assert.assertEquals(t.getSd().getLocation(), "loc1new"); } @Test @@ -753,10 +710,10 @@ public class TestCachedStore { String dbName = "testTableColStatsOps1"; String tblName = "tbl1"; String colName = "f1"; - + Database db = new Database(dbName, null, "some_location", null); cachedStore.createDatabase(db); - + List<FieldSchema> cols = new ArrayList<>(); cols.add(new FieldSchema(colName, "int", null)); List<FieldSchema> partCols = new ArrayList<>(); @@ -764,29 +721,29 @@ public class TestCachedStore { StorageDescriptor sd = new StorageDescriptor(cols, null, "input", "output", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<>()), null, null, null); - + Table tbl = new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(), null, null, TableType.MANAGED_TABLE.toString()); cachedStore.createTable(tbl); - + List<String> partVals1 = new ArrayList<>(); partVals1.add("1"); List<String> partVals2 = new ArrayList<>(); partVals2.add("2"); - + Partition ptn1 = new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>()); cachedStore.addPartition(ptn1); Partition ptn2 = new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>()); cachedStore.addPartition(ptn2); - + ColumnStatistics stats = new ColumnStatistics(); ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName); statsDesc.setPartName("col"); List<ColumnStatisticsObj> colStatObjs = new ArrayList<>(); - + ColumnStatisticsData data = new ColumnStatisticsData(); ColumnStatisticsObj colStats = new ColumnStatisticsObj(colName, "int", data); LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector(); @@ -796,15 +753,15 @@ public class TestCachedStore { longStats.setNumDVs(30); data.setLongStats(longStats); colStatObjs.add(colStats); - + stats.setStatsDesc(statsDesc); stats.setStatsObj(colStatObjs); - + cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals1); - + longStats.setNumDVs(40); cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals2); - + List<String> colNames = new ArrayList<>(); colNames.add(colName); List<String> aggrPartVals = new ArrayList<>(); @@ -823,10 +780,10 @@ public class TestCachedStore { String dbName = "testTableColStatsOps2"; String tblName = "tbl2"; String colName = "f1"; - + Database db = new Database(dbName, null, "some_location", null); cachedStore.createDatabase(db); - + List<FieldSchema> cols = new ArrayList<>(); cols.add(new FieldSchema(colName, "int", null)); List<FieldSchema> partCols = new ArrayList<>(); @@ -834,29 +791,29 @@ public class TestCachedStore { StorageDescriptor sd = new StorageDescriptor(cols, null, "input", "output", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<>()), null, null, null); - + Table tbl = new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(), null, null, TableType.MANAGED_TABLE.toString()); cachedStore.createTable(tbl); - + List<String> partVals1 = new ArrayList<>(); partVals1.add("1"); List<String> partVals2 = new ArrayList<>(); partVals2.add("2"); - + Partition ptn1 = new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>()); cachedStore.addPartition(ptn1); Partition ptn2 = new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>()); cachedStore.addPartition(ptn2); - + ColumnStatistics stats = new ColumnStatistics(); ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName); statsDesc.setPartName("col"); List<ColumnStatisticsObj> colStatObjs = new ArrayList<>(); - + ColumnStatisticsData data = new ColumnStatisticsData(); ColumnStatisticsObj colStats = new ColumnStatisticsObj(colName, "int", data); LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector(); @@ -864,21 +821,21 @@ public class TestCachedStore { longStats.setHighValue(100); longStats.setNumNulls(50); longStats.setNumDVs(30); - + HyperLogLog hll = HyperLogLog.builder().build(); hll.addLong(1); hll.addLong(2); hll.addLong(3); longStats.setBitVectors(hll.serialize()); - + data.setLongStats(longStats); colStatObjs.add(colStats); - + stats.setStatsDesc(statsDesc); stats.setStatsObj(colStatObjs); - + cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals1); - + longStats.setNumDVs(40); hll = HyperLogLog.builder().build(); hll.addLong(2); @@ -886,9 +843,9 @@ public class TestCachedStore { hll.addLong(4); hll.addLong(5); longStats.setBitVectors(hll.serialize()); - + cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals2); - + List<String> colNames = new ArrayList<>(); colNames.add(colName); List<String> aggrPartVals = new ArrayList<>(); @@ -901,4 +858,179 @@ public class TestCachedStore { Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100); Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumDVs(), 5); } + + @Test + public void testMultiThreadedSharedCacheOps() throws Exception { + List<String> dbNames = new ArrayList<String>(Arrays.asList("db1", "db2", "db3", "db4", "db5")); + List<Callable<Object>> tasks = new ArrayList<Callable<Object>>(); + ExecutorService executor = Executors.newFixedThreadPool(50, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + } + }); + + // Create 5 dbs + for (String dbName : dbNames) { + Callable<Object> c = new Callable<Object>() { + public Object call() { + Database db = createTestDb(dbName, "user1"); + sharedCache.addDatabaseToCache(db); + return null; + } + }; + tasks.add(c); + } + executor.invokeAll(tasks); + for (String dbName : dbNames) { + Database db = sharedCache.getDatabaseFromCache(dbName); + Assert.assertNotNull(db); + Assert.assertEquals(dbName, db.getName()); + } + + // Created 5 tables under "db1" + List<String> tblNames = + new ArrayList<String>(Arrays.asList("tbl1", "tbl2", "tbl3", "tbl4", "tbl5")); + tasks.clear(); + for (String tblName : tblNames) { + FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); + FieldSchema col2 = new FieldSchema("col2", "string", "string column"); + List<FieldSchema> cols = new ArrayList<FieldSchema>(); + cols.add(col1); + cols.add(col2); + FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column"); + List<FieldSchema> ptnCols = new ArrayList<FieldSchema>(); + ptnCols.add(ptnCol1); + Callable<Object> c = new Callable<Object>() { + public Object call() { + Table tbl = createTestTbl(dbNames.get(0), tblName, "user1", cols, ptnCols); + sharedCache.addTableToCache(dbNames.get(0), tblName, tbl); + return null; + } + }; + tasks.add(c); + } + executor.invokeAll(tasks); + for (String tblName : tblNames) { + Table tbl = sharedCache.getTableFromCache(dbNames.get(0), tblName); + Assert.assertNotNull(tbl); + Assert.assertEquals(tblName, tbl.getTableName()); + } + + // Add 5 partitions to all tables + List<String> ptnVals = new ArrayList<String>(Arrays.asList("aaa", "bbb", "ccc", "ddd", "eee")); + tasks.clear(); + for (String tblName : tblNames) { + Table tbl = sharedCache.getTableFromCache(dbNames.get(0), tblName); + for (String ptnVal : ptnVals) { + Map<String, String> partParams = new HashMap<String, String>(); + Callable<Object> c = new Callable<Object>() { + public Object call() { + Partition ptn = new Partition(Arrays.asList(ptnVal), dbNames.get(0), tblName, 0, 0, + tbl.getSd(), partParams); + sharedCache.addPartitionToCache(dbNames.get(0), tblName, ptn); + return null; + } + }; + tasks.add(c); + } + } + executor.invokeAll(tasks); + for (String tblName : tblNames) { + for (String ptnVal : ptnVals) { + Partition ptn = sharedCache.getPartitionFromCache(dbNames.get(0), tblName, Arrays.asList(ptnVal)); + Assert.assertNotNull(ptn); + Assert.assertEquals(tblName, ptn.getTableName()); + Assert.assertEquals(tblName, ptn.getTableName()); + Assert.assertEquals(Arrays.asList(ptnVal), ptn.getValues()); + } + } + + // Drop all partitions from "tbl1", "tbl2", "tbl3" and add 2 new partitions to "tbl4" and "tbl5" + List<String> newPtnVals = new ArrayList<String>(Arrays.asList("fff", "ggg")); + List<String> dropPtnTblNames = new ArrayList<String>(Arrays.asList("tbl1", "tbl2", "tbl3")); + List<String> addPtnTblNames = new ArrayList<String>(Arrays.asList("tbl4", "tbl5")); + tasks.clear(); + for (String tblName : dropPtnTblNames) { + for (String ptnVal : ptnVals) { + Callable<Object> c = new Callable<Object>() { + public Object call() { + sharedCache.removePartitionFromCache(dbNames.get(0), tblName, Arrays.asList(ptnVal)); + return null; + } + }; + tasks.add(c); + } + } + for (String tblName : addPtnTblNames) { + Table tbl = sharedCache.getTableFromCache(dbNames.get(0), tblName); + for (String ptnVal : newPtnVals) { + Map<String, String> partParams = new HashMap<String, String>(); + Callable<Object> c = new Callable<Object>() { + public Object call() { + Partition ptn = new Partition(Arrays.asList(ptnVal), dbNames.get(0), tblName, 0, 0, + tbl.getSd(), partParams); + sharedCache.addPartitionToCache(dbNames.get(0), tblName, ptn); + return null; + } + }; + tasks.add(c); + } + } + executor.invokeAll(tasks); + for (String tblName : addPtnTblNames) { + for (String ptnVal : newPtnVals) { + Partition ptn = sharedCache.getPartitionFromCache(dbNames.get(0), tblName, Arrays.asList(ptnVal)); + Assert.assertNotNull(ptn); + Assert.assertEquals(tblName, ptn.getTableName()); + Assert.assertEquals(tblName, ptn.getTableName()); + Assert.assertEquals(Arrays.asList(ptnVal), ptn.getValues()); + } + } + for (String tblName : dropPtnTblNames) { + List<Partition> ptns = sharedCache.listCachedPartitions(dbNames.get(0), tblName, 100); + Assert.assertEquals(0, ptns.size()); + } + sharedCache.getDatabaseCache().clear(); + sharedCache.getTableCache().clear(); + sharedCache.getSdCache().clear(); + } + + private Database createTestDb(String dbName, String dbOwner) { + String dbDescription = dbName; + String dbLocation = "file:/tmp"; + Map<String, String> dbParams = new HashMap<>(); + Database db = new Database(dbName, dbDescription, dbLocation, dbParams); + db.setOwnerName(dbOwner); + db.setOwnerType(PrincipalType.USER); + return db; + } + + private Table createTestTbl(String dbName, String tblName, String tblOwner, + List<FieldSchema> cols, List<FieldSchema> ptnCols) { + String serdeLocation = "file:/tmp"; + Map<String, String> serdeParams = new HashMap<>(); + Map<String, String> tblParams = new HashMap<>(); + SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<>()); + StorageDescriptor sd = new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, + serdeInfo, null, null, serdeParams); + sd.setStoredAsSubDirectories(false); + Table tbl = new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null, + TableType.MANAGED_TABLE.toString()); + return tbl; + } + + // This method will return only after the cache has updated once + private void updateCache(CachedStore cachedStore) throws InterruptedException { + int maxTries = 100000; + long updateCountBefore = cachedStore.getCacheUpdateCount(); + // Start the CachedStore update service + CachedStore.startCacheUpdateService(cachedStore.getConf(), true, false); + while ((cachedStore.getCacheUpdateCount() != (updateCountBefore + 1)) && (maxTries-- > 0)) { + Thread.sleep(1000); + } + CachedStore.stopCacheUpdateService(100); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/resources/log4j2.properties b/standalone-metastore/src/test/resources/log4j2.properties index 365687e..db8a550 100644 --- a/standalone-metastore/src/test/resources/log4j2.properties +++ b/standalone-metastore/src/test/resources/log4j2.properties @@ -8,28 +8,64 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. -name=PropertiesConfig -property.filename = logs -appenders = console +status = INFO +name = MetastoreLog4j2 +packages = org.apache.hadoop.hive.metastore +# list of properties +property.metastore.log.level = INFO +property.metastore.root.logger = DRFA +property.metastore.log.dir = ${sys:java.io.tmpdir}/${sys:user.name} +property.metastore.log.file = metastore.log +property.hive.perflogger.log.level = INFO + +# list of all appenders +appenders = console, DRFA + +# console appender appender.console.type = Console -appender.console.name = STDOUT +appender.console.name = console +appender.console.target = SYSTEM_ERR appender.console.layout.type = PatternLayout -appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n +appender.console.layout.pattern = %d{ISO8601} %5p [%t] %c{2}: %m%n + +# daily rolling file appender +appender.DRFA.type = RollingRandomAccessFile +appender.DRFA.name = DRFA +appender.DRFA.fileName = ${sys:metastore.log.dir}/${sys:metastore.log.file} +# Use %pid in the filePattern to append <process-id>@<host-name> to the filename if you want separate log files for different CLI session +appender.DRFA.filePattern = ${sys:metastore.log.dir}/${sys:metastore.log.file}.%d{yyyy-MM-dd} +appender.DRFA.layout.type = PatternLayout +appender.DRFA.layout.pattern = %d{ISO8601} %5p [%t] %c{2}: %m%n +appender.DRFA.policies.type = Policies +appender.DRFA.policies.time.type = TimeBasedTriggeringPolicy +appender.DRFA.policies.time.interval = 1 +appender.DRFA.policies.time.modulate = true +appender.DRFA.strategy.type = DefaultRolloverStrategy +appender.DRFA.strategy.max = 30 + +# list of all loggers +loggers = DataNucleus, Datastore, JPOX, PerfLogger + +logger.DataNucleus.name = DataNucleus +logger.DataNucleus.level = INFO + +logger.Datastore.name = Datastore +logger.Datastore.level = INFO + +logger.JPOX.name = JPOX +logger.JPOX.level = INFO -loggers=file -logger.file.name=guru.springframework.blog.log4j2properties -logger.file.level = debug -logger.file.appenderRefs = file -logger.file.appenderRef.file.ref = LOGFILE +logger.PerfLogger.name = org.apache.hadoop.hive.ql.log.PerfLogger +logger.PerfLogger.level = ${sys:hive.perflogger.log.level} -rootLogger.level = debug -rootLogger.appenderRefs = stdout -rootLogger.appenderRef.stdout.ref = STDOUT +# root logger +rootLogger.level = ${sys:metastore.log.level} +rootLogger.appenderRefs = root +rootLogger.appenderRef.root.ref = ${sys:metastore.root.logger}