This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 6d7a15e [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog 6d7a15e is described below commit 6d7a15e328e8af6f001d6191c3f167bd8038bf14 Author: zjuwangg <zjuwa...@foxmail.com> AuthorDate: Wed Jun 12 00:53:00 2019 +0800 [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog This pull request makes HiveCatalog support Hive table stats related operations. This closes #8636. --- .../flink/table/catalog/hive/HiveCatalog.java | 80 ++++++++++++++++++++-- .../hive/client/HiveMetastoreClientWrapper.java | 11 +-- .../flink/table/catalog/hive/client/HiveShim.java | 14 ++++ .../table/catalog/hive/client/HiveShimV1.java | 10 +++ .../table/catalog/hive/client/HiveShimV2.java | 9 +++ .../table/catalog/hive/util/HiveStatsUtil.java | 2 + .../hive/HiveCatalogGenericMetadataTest.java | 8 +++ .../table/catalog/GenericInMemoryCatalog.java | 9 ++- .../apache/flink/table/catalog/CatalogTest.java | 79 +++++++++++++++++++++ 9 files changed, 208 insertions(+), 14 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index d83692b..fea0de4 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -59,6 +59,7 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.factories.TableFactory; import org.apache.flink.util.StringUtils; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.TableType; @@ -826,7 +827,7 @@ public class HiveCatalog extends AbstractCatalog { } private void ensurePartitionedTable(ObjectPath tablePath, Table hiveTable) throws TableNotPartitionedException { - if (hiveTable.getPartitionKeysSize() == 0) { + if (!isTablePartitioned(hiveTable)) { throw new TableNotPartitionedException(getName(), tablePath); } } @@ -1077,7 +1078,19 @@ public class HiveCatalog extends AbstractCatalog { @Override public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - + try { + Table hiveTable = getHiveTable(tablePath); + // Set table stats + if (compareAndUpdateStatisticsProperties(tableStatistics, hiveTable.getParameters())) { + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable); + } + } catch (TableNotExistException e) { + if (!ignoreIfNotExists) { + throw e; + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to alter table stats of table %s", tablePath.getFullName()), e); + } } @Override @@ -1099,9 +1112,50 @@ public class HiveCatalog extends AbstractCatalog { } } + /** + * Determine if statistics is need to be updated, if it needs to be updated and updated its parameters. + * @param statistics original ``hive table statistics. + * @param parameters new catalog table statistics parameters. + * @return needUpdateStatistics flag which indicates whether need to update stats. + */ + private static boolean compareAndUpdateStatisticsProperties(CatalogTableStatistics statistics, Map<String, String> parameters) { + boolean needUpdateStatistics; + String oldRowCount = parameters.getOrDefault(StatsSetupConst.ROW_COUNT, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST); + String oldTotalSize = parameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST); + String oldNumFiles = parameters.getOrDefault(StatsSetupConst.NUM_FILES, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST); + String oldRawDataSize = parameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST); + needUpdateStatistics = statistics.getRowCount() != Long.parseLong(oldRowCount) || statistics.getTotalSize() != Long.parseLong(oldTotalSize) + || statistics.getFileCount() != Integer.parseInt(oldNumFiles) || statistics.getRawDataSize() != Long.parseLong(oldRawDataSize); + if (needUpdateStatistics) { + parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(statistics.getRowCount())); + parameters.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(statistics.getTotalSize())); + parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(statistics.getFileCount())); + parameters.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(statistics.getRawDataSize())); + } + return needUpdateStatistics; + } + + private static CatalogTableStatistics createCatalogTableStatistics(Map<String, String> parameters) { + long rowRount = Long.parseLong(parameters.getOrDefault(StatsSetupConst.ROW_COUNT, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST)); + long totalSize = Long.parseLong(parameters.getOrDefault(StatsSetupConst.TOTAL_SIZE, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST)); + int numFiles = Integer.parseInt(parameters.getOrDefault(StatsSetupConst.NUM_FILES, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST)); + long rawDataSize = Long.parseLong(parameters.getOrDefault(StatsSetupConst.RAW_DATA_SIZE, HiveStatsUtil.DEFAULT_STATS_ZERO_CONST)); + return new CatalogTableStatistics(rowRount, numFiles, totalSize, rawDataSize); + } + @Override public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { - + try { + Partition hivePartition = getHivePartition(tablePath, partitionSpec); + // Set table stats + if (compareAndUpdateStatisticsProperties(partitionStatistics, hivePartition.getParameters())) { + client.alter_partition(tablePath.getDatabaseName(), tablePath.getObjectName(), hivePartition); + } + } catch (TableNotExistException | PartitionSpecInvalidException e) { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e); + } catch (TException e) { + throw new CatalogException(String.format("Failed to alter table stats of table %s 's partition %s", tablePath.getFullName(), String.valueOf(partitionSpec)), e); + } } @Override @@ -1132,8 +1186,14 @@ public class HiveCatalog extends AbstractCatalog { } @Override - public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, + CatalogException { + Table hiveTable = getHiveTable(tablePath); + if (!isTablePartitioned(hiveTable)) { + return createCatalogTableStatistics(hiveTable.getParameters()); + } else { + return CatalogTableStatistics.UNKNOWN; + } } @Override @@ -1156,7 +1216,15 @@ public class HiveCatalog extends AbstractCatalog { @Override public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + Partition partition = getHivePartition(tablePath, partitionSpec); + return createCatalogTableStatistics(partition.getParameters()); + } catch (TableNotExistException | PartitionSpecInvalidException e) { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e); + } catch (TException e) { + throw new CatalogException(String.format("Failed to get partition stats of table %s 's partition %s", + tablePath.getFullName(), String.valueOf(partitionSpec)), e); + } } @Override diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java index 6f26ada..e1b25e6 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java @@ -137,11 +137,6 @@ public class HiveMetastoreClientWrapper implements AutoCloseable { client.createTable(table); } - public void alter_table(String databaseName, String tableName, Table table) - throws InvalidOperationException, MetaException, TException { - client.alter_table(databaseName, tableName, table); - } - public void createDatabase(Database database) throws InvalidObjectException, AlreadyExistsException, MetaException, TException { client.createDatabase(database); @@ -234,4 +229,10 @@ public class HiveMetastoreClientWrapper implements AutoCloseable { HiveShim hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); return hiveShim.getFunction(client, databaseName, functionName); } + + public void alter_table(String databaseName, String tableName, Table table) + throws InvalidOperationException, MetaException, TException { + HiveShim hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); + hiveShim.alterTable(client, databaseName, tableName, table); + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java index b0eab75..320f078 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java @@ -24,7 +24,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.thrift.TException; @@ -83,4 +86,15 @@ public interface HiveShim { * @throws IOException if the file/directory cannot be properly moved or deleted */ boolean moveToTrash(FileSystem fs, Path path, Configuration conf, boolean purge) throws IOException; + + /** + * Alters a Hive table. + * + * @param client the Hive metastore client + * @param databaseName the name of the database to which the table belongs + * @param tableName the name of the table to be altered + * @param table the new Hive table + */ + void alterTable(IMetaStoreClient client, String databaseName, String tableName, Table table) + throws InvalidOperationException, MetaException, TException; } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java index f28fc5c..830d6e6 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java @@ -24,10 +24,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; @@ -95,4 +97,12 @@ public class HiveShimV1 implements HiveShim { throw new IOException("Failed to move " + path + " to trash", e); } } + + @Override + public void alterTable(IMetaStoreClient client, String databaseName, String tableName, Table table) throws InvalidOperationException, MetaException, TException { + // For Hive-1.2.1, we need to tell HMS not to update stats. Otherwise, the stats we put in the table + // parameters can be overridden. The extra config we add here will be removed by HMS after it's used. + table.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, "true"); + client.alter_table(databaseName, tableName, table); + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java index 58bb460..7df0aaf 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java @@ -29,7 +29,10 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.thrift.TException; @@ -86,4 +89,10 @@ public class HiveShimV2 implements HiveShim { throw new IOException("Failed to move " + path + " to trash", e); } } + + @Override + public void alterTable(IMetaStoreClient client, String databaseName, String tableName, Table table) throws InvalidOperationException, MetaException, TException { + // For Hive-2.3.4, we don't need to tell HMS not to update stats. + client.alter_table(databaseName, tableName, table); + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java index 80baf72..97bfa56 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java @@ -62,6 +62,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class HiveStatsUtil { private static final Logger LOG = LoggerFactory.getLogger(HiveStatsUtil.class); + public static final String DEFAULT_STATS_ZERO_CONST = "0"; + private HiveStatsUtil() {} /** diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java index 5bc6ff6..83e0132 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java @@ -266,6 +266,14 @@ public class HiveCatalogGenericMetadataTest extends CatalogTestBase { public void testListPartitionPartialSpec() throws Exception { } + @Override + public void testGetPartitionStats() throws Exception { + } + + @Override + public void testAlterPartitionTableStats() throws Exception { + } + // ------ test utils ------ @Override diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java index 9efce69..c0924ca 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java @@ -590,9 +590,12 @@ public class GenericInMemoryCatalog extends AbstractCatalog { if (!tableExists(tablePath)) { throw new TableNotExistException(getName(), tablePath); } - - CatalogTableStatistics result = tableStats.get(tablePath); - return result != null ? result.copy() : CatalogTableStatistics.UNKNOWN; + if (!isPartitionedTable(tablePath)) { + CatalogTableStatistics result = tableStats.get(tablePath); + return result != null ? result.copy() : CatalogTableStatistics.UNKNOWN; + } else { + return CatalogTableStatistics.UNKNOWN; + } } @Override diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java index 0c2b632..357eb23 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java @@ -33,6 +33,7 @@ import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.junit.After; import org.junit.AfterClass; @@ -1056,6 +1057,84 @@ public abstract class CatalogTest { assertEquals(1, catalog.listPartitions(path1, createAnotherPartitionSpecSubset()).size()); } + + // ------ table and column stats ------ + + @Test + public void testGetTableStats_TableNotExistException() throws Exception{ + catalog.createDatabase(db1, createDb(), false); + exception.expect(org.apache.flink.table.catalog.exceptions.TableNotExistException.class); + catalog.getTableStatistics(path1); + } + + @Test + public void testGetPartitionStats() throws Exception{ + catalog.createDatabase(db1, createDb(), false); + catalog.createTable(path1, createPartitionedTable(), false); + catalog.createPartition(path1, createPartitionSpec(), createPartition(), false); + CatalogTableStatistics tableStatistics = catalog.getPartitionStatistics(path1, createPartitionSpec()); + assertEquals(0, tableStatistics.getFileCount()); + assertEquals(0, tableStatistics.getRawDataSize()); + assertEquals(0, tableStatistics.getTotalSize()); + assertEquals(0, tableStatistics.getRowCount()); + } + + @Test + public void testAlterTableStats() throws Exception{ + // Non-partitioned table + catalog.createDatabase(db1, createDb(), false); + CatalogTable table = createTable(); + catalog.createTable(path1, table, false); + CatalogTableStatistics tableStats = new CatalogTableStatistics(100, 10, 1000, 10000); + catalog.alterTableStatistics(path1, tableStats, false); + CatalogTableStatistics actual = catalog.getTableStatistics(path1); + + // we don't check fileCount and totalSize here for hive will automatically calc and set to real num. + assertEquals(tableStats.getRowCount(), actual.getRowCount()); + assertEquals(tableStats.getRawDataSize(), actual.getRawDataSize()); + } + + @Test + public void testAlterTableStats_partitionedTable() throws Exception { + // alterTableStats() should do nothing for partitioned tables + // getTableStats() should return unknown column stats for partitioned tables + catalog.createDatabase(db1, createDb(), false); + CatalogTable catalogTable = createPartitionedTable(); + catalog.createTable(path1, catalogTable, false); + + CatalogTableStatistics stats = new CatalogTableStatistics(100, 1, 1000, 10000); + + catalog.alterTableStatistics(path1, stats, false); + + assertEquals(CatalogTableStatistics.UNKNOWN, catalog.getTableStatistics(path1)); + } + + @Test + public void testAlterPartitionTableStats() throws Exception { + catalog.createDatabase(db1, createDb(), false); + CatalogTable catalogTable = createPartitionedTable(); + catalog.createTable(path1, catalogTable, false); + CatalogPartitionSpec partitionSpec = createPartitionSpec(); + catalog.createPartition(path1, partitionSpec, createPartition(), true); + CatalogTableStatistics stats = new CatalogTableStatistics(100, 1, 1000, 10000); + catalog.alterPartitionStatistics(path1, partitionSpec, stats, false); + CatalogTableStatistics actual = catalog.getPartitionStatistics(path1, partitionSpec); + assertEquals(stats.getRowCount(), actual.getRowCount()); + assertEquals(stats.getRawDataSize(), actual.getRawDataSize()); + } + + @Test + public void testAlterTableStats_TableNotExistException() throws Exception { + exception.expect(TableNotExistException.class); + catalog.alterTableStatistics(new ObjectPath(catalog.getDefaultDatabase(), "nonexist"), CatalogTableStatistics.UNKNOWN, false); + } + + @Test + public void testAlterTableStats_TableNotExistException_ignore() throws Exception { + catalog.alterTableStatistics(new ObjectPath("non", "exist"), CatalogTableStatistics.UNKNOWN, true); + } + + // ------ utilities ------ /**