bowenli86 commented on a change in pull request #8449: [FLINK-12235][hive]
Support Hive partition in HiveCatalog
URL: https://github.com/apache/flink/pull/8449#discussion_r286608694
##
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
##
@@ -616,44 +619,279 @@ private static Table instantiateHiveTable(ObjectPath
tablePath, CatalogBaseTabl
// -- partitions --
@Override
- public void createPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
- throws TableNotExistException,
TableNotPartitionedException, PartitionSpecInvalidException,
PartitionAlreadyExistsException, CatalogException {
- throw new UnsupportedOperationException();
+ public boolean partitionExists(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be
null");
+ try {
+ return getHivePartition(tablePath, partitionSpec) !=
null;
+ } catch (NoSuchObjectException | TableNotExistException |
PartitionSpecInvalidException e) {
+ return false;
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to get partition %s of
table %s", partitionSpec, tablePath), e);
+ }
}
@Override
- public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, boolean ignoreIfNotExists)
- throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+ throws TableNotExistException,
TableNotPartitionedException, PartitionSpecInvalidException,
PartitionAlreadyExistsException, CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be
null");
+ checkNotNull(partition, "Partition cannot be null");
+
+ checkArgument(partition instanceof HiveCatalogPartition,
"Currently only supports HiveCatalogPartition");
+
+ Table hiveTable = getHiveTable(tablePath);
+
+ ensureTableAndPartitionMatch(hiveTable, partition);
+
+ ensurePartitionedTable(tablePath, hiveTable);
+
+ try {
+
client.add_partition(instantiateHivePartition(hiveTable, partitionSpec,
partition));
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new
PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
+ }
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to create partition %s of
table %s", partitionSpec, tablePath));
+ }
}
@Override
- public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
+ public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be
null");
+ try {
+ Table hiveTable = getHiveTable(tablePath);
+ client.dropPartition(tablePath.getDatabaseName(),
tablePath.getObjectName(),
+ getOrderedFullPartitionValues(partitionSpec,
getFieldNames(hiveTable.getPartitionKeys()), tablePath), true);
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new
PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+ }
+ } catch (MetaException | TableNotExistException |
PartitionSpecInvalidException e) {
+ throw new PartitionNotExistException(catalogName,
tablePath, partitionSpec, e);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to drop partition %s of
table %s", partitionSpec, tablePath));
+