This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b2e260e [FLINK-13160]HiveTaleSource should implment PartitionableTableSource b2e260e is described below commit b2e260e3eb24c72bd397029f1b7fe73f7c0e8aae Author: zjuwangg <zjuwa...@foxmail.com> AuthorDate: Tue Jul 9 16:38:17 2019 +0800 [FLINK-13160]HiveTaleSource should implment PartitionableTableSource This PR implements PartitionableTableSource for HiveTaleSource to support partition pruning. This closes #9032. --- .../batch/connectors/hive/HiveTableSource.java | 78 +++++++++++++++++++--- 1 file changed, 68 insertions(+), 10 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java index 19e46de..31b896f 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java @@ -28,9 +28,12 @@ import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; import org.apache.flink.table.sources.InputFormatTableSource; +import org.apache.flink.table.sources.PartitionableTableSource; +import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Partition; @@ -49,27 +52,47 @@ import java.util.Map; /** * A TableSource implementation to read data from Hive tables. */ -public class HiveTableSource extends InputFormatTableSource<Row> { +public class HiveTableSource extends InputFormatTableSource<Row> implements PartitionableTableSource { private static Logger logger = LoggerFactory.getLogger(HiveTableSource.class); private final JobConf jobConf; private final ObjectPath tablePath; private final CatalogTable catalogTable; - private List<HiveTablePartition> allPartitions; + private List<HiveTablePartition> allHivePartitions; private String hiveVersion; + //partitionList represent all partitions in map list format used in partition-pruning situation. + private List<Map<String, String>> partitionList = new ArrayList<>(); + private Map<Map<String, String>, HiveTablePartition> partitionSpec2HiveTablePartition = new HashMap<>(); + private boolean initAllPartitions; public HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable) { - this.jobConf = jobConf; - this.tablePath = tablePath; - this.catalogTable = catalogTable; + this.jobConf = Preconditions.checkNotNull(jobConf); + this.tablePath = Preconditions.checkNotNull(tablePath); + this.catalogTable = Preconditions.checkNotNull(catalogTable); this.hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion()); + initAllPartitions = false; + } + + private HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable, + List<HiveTablePartition> allHivePartitions, + String hiveVersion, + List<Map<String, String>> partitionList) { + this.jobConf = Preconditions.checkNotNull(jobConf); + this.tablePath = Preconditions.checkNotNull(tablePath); + this.catalogTable = Preconditions.checkNotNull(catalogTable); + this.allHivePartitions = allHivePartitions; + this.hiveVersion = hiveVersion; + this.partitionList = partitionList; + this.initAllPartitions = true; } @Override public InputFormat getInputFormat() { - initAllPartitions(); - return new HiveTableInputFormat(jobConf, catalogTable, allPartitions); + if (!initAllPartitions) { + initAllPartitions(); + } + return new HiveTableInputFormat(jobConf, catalogTable, allHivePartitions); } @Override @@ -87,8 +110,37 @@ public class HiveTableSource extends InputFormatTableSource<Row> { return DataTypes.ROW(fields); } + @Override + public List<Map<String, String>> getPartitions() { + if (!initAllPartitions) { + initAllPartitions(); + } + return partitionList; + } + + @Override + public List<String> getPartitionFieldNames() { + return catalogTable.getPartitionKeys(); + } + + @Override + public TableSource applyPartitionPruning(List<Map<String, String>> remainingPartitions) { + if (catalogTable.getPartitionKeys() == null || catalogTable.getPartitionKeys().size() == 0) { + return this; + } else { + List<HiveTablePartition> remainingHivePartitions = new ArrayList<>(); + for (Map<String, String> partitionSpec : remainingPartitions) { + HiveTablePartition hiveTablePartition = partitionSpec2HiveTablePartition.get(partitionSpec); + Preconditions.checkNotNull(hiveTablePartition, String.format("remainingPartitions must contain " + + "partition spec %s", partitionSpec)); + remainingHivePartitions.add(hiveTablePartition); + } + return new HiveTableSource(jobConf, tablePath, catalogTable, remainingHivePartitions, hiveVersion, partitionList); + } + } + private void initAllPartitions() { - allPartitions = new ArrayList<>(); + allHivePartitions = new ArrayList<>(); // Please note that the following directly accesses Hive metastore, which is only a temporary workaround. // Ideally, we need to go thru Catalog API to get all info we need here, which requires some major // refactoring. We will postpone this until we merge Blink to Flink. @@ -102,21 +154,27 @@ public class HiveTableSource extends InputFormatTableSource<Row> { for (Partition partition : partitions) { StorageDescriptor sd = partition.getSd(); Map<String, Object> partitionColValues = new HashMap<>(); + Map<String, String> partitionSpec = new HashMap<>(); for (int i = 0; i < partitionColNames.size(); i++) { String partitionColName = partitionColNames.get(i); String partitionValue = partition.getValues().get(i); + partitionSpec.put(partitionColName, partitionValue); DataType type = catalogTable.getSchema().getFieldDataType(partitionColName).get(); Object partitionObject = restorePartitionValueFromFromType(partitionValue, type); partitionColValues.put(partitionColName, partitionObject); } - allPartitions.add(new HiveTablePartition(sd, partitionColValues)); + HiveTablePartition hiveTablePartition = new HiveTablePartition(sd, partitionColValues); + allHivePartitions.add(hiveTablePartition); + partitionList.add(partitionSpec); + partitionSpec2HiveTablePartition.put(partitionSpec, hiveTablePartition); } } else { - allPartitions.add(new HiveTablePartition(client.getTable(dbName, tableName).getSd(), null)); + allHivePartitions.add(new HiveTablePartition(client.getTable(dbName, tableName).getSd(), null)); } } catch (TException e) { throw new FlinkHiveException("Failed to collect all partitions from hive metaStore", e); } + initAllPartitions = true; } private Object restorePartitionValueFromFromType(String valStr, DataType type) {