This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b2e260e  [FLINK-13160]HiveTaleSource should implment 
PartitionableTableSource
b2e260e is described below

commit b2e260e3eb24c72bd397029f1b7fe73f7c0e8aae
Author: zjuwangg <zjuwa...@foxmail.com>
AuthorDate: Tue Jul 9 16:38:17 2019 +0800

    [FLINK-13160]HiveTaleSource should implment PartitionableTableSource
    
    This PR implements PartitionableTableSource for HiveTaleSource to support 
partition pruning.
    
    This closes #9032.
---
 .../batch/connectors/hive/HiveTableSource.java     | 78 +++++++++++++++++++---
 1 file changed, 68 insertions(+), 10 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
index 19e46de..31b896f 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
@@ -28,9 +28,12 @@ import 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.sources.InputFormatTableSource;
+import org.apache.flink.table.sources.PartitionableTableSource;
+import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -49,27 +52,47 @@ import java.util.Map;
 /**
  * A TableSource implementation to read data from Hive tables.
  */
-public class HiveTableSource extends InputFormatTableSource<Row> {
+public class HiveTableSource extends InputFormatTableSource<Row> implements 
PartitionableTableSource {
 
        private static Logger logger = 
LoggerFactory.getLogger(HiveTableSource.class);
 
        private final JobConf jobConf;
        private final ObjectPath tablePath;
        private final CatalogTable catalogTable;
-       private List<HiveTablePartition> allPartitions;
+       private List<HiveTablePartition> allHivePartitions;
        private String hiveVersion;
+       //partitionList represent all partitions in map list format used in 
partition-pruning situation.
+       private List<Map<String, String>> partitionList = new ArrayList<>();
+       private Map<Map<String, String>, HiveTablePartition> 
partitionSpec2HiveTablePartition = new HashMap<>();
+       private boolean initAllPartitions;
 
        public HiveTableSource(JobConf jobConf, ObjectPath tablePath, 
CatalogTable catalogTable) {
-               this.jobConf = jobConf;
-               this.tablePath = tablePath;
-               this.catalogTable = catalogTable;
+               this.jobConf = Preconditions.checkNotNull(jobConf);
+               this.tablePath = Preconditions.checkNotNull(tablePath);
+               this.catalogTable = Preconditions.checkNotNull(catalogTable);
                this.hiveVersion = 
jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, 
HiveShimLoader.getHiveVersion());
+               initAllPartitions = false;
+       }
+
+       private HiveTableSource(JobConf jobConf, ObjectPath tablePath, 
CatalogTable catalogTable,
+                                                       
List<HiveTablePartition> allHivePartitions,
+                                                       String hiveVersion,
+                                                       List<Map<String, 
String>> partitionList) {
+               this.jobConf = Preconditions.checkNotNull(jobConf);
+               this.tablePath = Preconditions.checkNotNull(tablePath);
+               this.catalogTable = Preconditions.checkNotNull(catalogTable);
+               this.allHivePartitions = allHivePartitions;
+               this.hiveVersion = hiveVersion;
+               this.partitionList = partitionList;
+               this.initAllPartitions = true;
        }
 
        @Override
        public InputFormat getInputFormat() {
-               initAllPartitions();
-               return new HiveTableInputFormat(jobConf, catalogTable, 
allPartitions);
+               if (!initAllPartitions) {
+                       initAllPartitions();
+               }
+               return new HiveTableInputFormat(jobConf, catalogTable, 
allHivePartitions);
        }
 
        @Override
@@ -87,8 +110,37 @@ public class HiveTableSource extends 
InputFormatTableSource<Row> {
                return DataTypes.ROW(fields);
        }
 
+       @Override
+       public List<Map<String, String>> getPartitions() {
+               if (!initAllPartitions) {
+                       initAllPartitions();
+               }
+               return partitionList;
+       }
+
+       @Override
+       public List<String> getPartitionFieldNames() {
+               return catalogTable.getPartitionKeys();
+       }
+
+       @Override
+       public TableSource applyPartitionPruning(List<Map<String, String>> 
remainingPartitions) {
+               if (catalogTable.getPartitionKeys() == null || 
catalogTable.getPartitionKeys().size() == 0) {
+                       return this;
+               } else {
+                       List<HiveTablePartition> remainingHivePartitions = new 
ArrayList<>();
+                       for (Map<String, String> partitionSpec : 
remainingPartitions) {
+                               HiveTablePartition hiveTablePartition = 
partitionSpec2HiveTablePartition.get(partitionSpec);
+                               Preconditions.checkNotNull(hiveTablePartition, 
String.format("remainingPartitions must contain " +
+                                                                               
                                                                        
"partition spec %s", partitionSpec));
+                               remainingHivePartitions.add(hiveTablePartition);
+                       }
+                       return new HiveTableSource(jobConf, tablePath, 
catalogTable, remainingHivePartitions, hiveVersion, partitionList);
+               }
+       }
+
        private void initAllPartitions() {
-               allPartitions = new ArrayList<>();
+               allHivePartitions = new ArrayList<>();
                // Please note that the following directly accesses Hive 
metastore, which is only a temporary workaround.
                // Ideally, we need to go thru Catalog API to get all info we 
need here, which requires some major
                // refactoring. We will postpone this until we merge Blink to 
Flink.
@@ -102,21 +154,27 @@ public class HiveTableSource extends 
InputFormatTableSource<Row> {
                                for (Partition partition : partitions) {
                                        StorageDescriptor sd = 
partition.getSd();
                                        Map<String, Object> partitionColValues 
= new HashMap<>();
+                                       Map<String, String> partitionSpec = new 
HashMap<>();
                                        for (int i = 0; i < 
partitionColNames.size(); i++) {
                                                String partitionColName = 
partitionColNames.get(i);
                                                String partitionValue = 
partition.getValues().get(i);
+                                               
partitionSpec.put(partitionColName, partitionValue);
                                                DataType type = 
catalogTable.getSchema().getFieldDataType(partitionColName).get();
                                                Object partitionObject = 
restorePartitionValueFromFromType(partitionValue, type);
                                                
partitionColValues.put(partitionColName, partitionObject);
                                        }
-                                       allPartitions.add(new 
HiveTablePartition(sd, partitionColValues));
+                                       HiveTablePartition hiveTablePartition = 
new HiveTablePartition(sd, partitionColValues);
+                                       
allHivePartitions.add(hiveTablePartition);
+                                       partitionList.add(partitionSpec);
+                                       
partitionSpec2HiveTablePartition.put(partitionSpec, hiveTablePartition);
                                }
                        } else {
-                               allPartitions.add(new 
HiveTablePartition(client.getTable(dbName, tableName).getSd(), null));
+                               allHivePartitions.add(new 
HiveTablePartition(client.getTable(dbName, tableName).getSd(), null));
                        }
                } catch (TException e) {
                        throw new FlinkHiveException("Failed to collect all 
partitions from hive metaStore", e);
                }
+               initAllPartitions = true;
        }
 
        private Object restorePartitionValueFromFromType(String valStr, 
DataType type) {

Reply via email to