[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r283461610 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +96,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws CatalogException thrown if the input base table is invalid. +*/ + protected abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) + throws CatalogException; + + /** +* Create a CatalogBaseTable from a Hive table. +* +* @param hiveTable a Hive table +* @return a CatalogBaseTable +*/ + protected abstract CatalogBaseTable createCatalogBaseTable(Table hiveTable); Review comment: I think we can just name the method createCatalogTable() which sounds more general, and as a direct counterpart of createHiveTable(). However, this is very minor though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r283457038 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +101,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws IllegalArgumentException thrown if the input base table is invalid. +*/ + protected abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) Review comment: If you're concerned about giving user precise, useful information, throwing an exception isn't the best choice. For that, logging is probably better. Neverthelesss, CatalogException certainly sounds better, though it doesn't change the nature of the problem. Let's do this way for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r283454112 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { Review comment: Yeah. While inmemory is currently doing that, it's achievable, which might not be the case for Hive. This is no consistency required among external catalogs. Nevertheless, I think we should if check if Hive allows it. It's fine to do this in a separate JIRA. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282712487 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogBaseTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateCatalogBaseTable(newCatalogTable); + + try { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + // TODO: [FLINK-12452] alterTable() in all catalogs should ensure existing base table and the new one are of the same type + Table newTable = createHiveTable(tablePath, newCatalogTable); + + // client.alter_table() requires a valid location + // thus, if new table doesn't have that, it reuses location of the old table + if (!newTable.getSd().isSetLocation()) { + Table oldTable = getHiveTable(tablePath); + newTable.getSd().setLocation(oldTable.getSd().getLocation()); + } + + client.alter_table
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282711429 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -363,4 +332,23 @@ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath throw new UnsupportedOperationException(); } + // -- utils -- + + /** +* Filter out Hive-created properties, and return Flink-created properties. +*/ + private Map retrieveFlinkProperties(Map hiveTableParams) { Review comment: The difference is that a static method doesn't require an instance of the class in order to invoke it. For example, if another static method of this class needs to call this method, without this one defined as static, one has to create an instance of the class to invoke the method. The general principle is that if a method doesn't access any of the object variables/methods, make it static. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282710918 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +101,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws IllegalArgumentException thrown if the input base table is invalid. +*/ + protected abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) Review comment: I'm not backtracking what I said earlier. It's fine to defined an API, but the API needs to be well-thought, which doesn't seem to be the case here. That's why I said we can forgo the API. If we think an API is the way to go, then let's define the API better and parent class shouldn't define what validation a subclass might do. Personally, I don't feel the API is sufficiently sound, but feel free to get 2nd opinion though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282322682 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogBaseTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateCatalogBaseTable(newCatalogTable); + + try { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + // TODO: [FLINK-12452] alterTable() in all catalogs should ensure existing base table and the new one are of the same type + Table newTable = createHiveTable(tablePath, newCatalogTable); + + // client.alter_table() requires a valid location + // thus, if new table doesn't have that, it reuses location of the old table + if (!newTable.getSd().isSetLocation()) { + Table oldTable = getHiveTable(tablePath); + newTable.getSd().setLocation(oldTable.getSd().getLocation()); + } + + client.alter_table
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282321982 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +101,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws IllegalArgumentException thrown if the input base table is invalid. +*/ + protected abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) Review comment: Even though this is not a public API, I feel it should be more formal. Specifically, we should probably can let this return true or false for validation result, rather than relying on throwing IllegalArgumentException. Secondly, validation can be more more complicated than just check the instance type. At this level, it shouldn't care what kind of validation the sub classes might do. I think it's acceptable if the base class doesn't care if the sub class does validation or how it does it. So it's okay if we remove this and let each subclass do its validation when create/alter table. This way might be even cleaner. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282322716 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + Table newTable = createHiveTable(tablePath, newCatalogTable); + + // client.alter_table() requires a valid location + // thus, if new table doesn't have that, it reuses location of the old table + if (!newTable.getSd().isSetLocation()) { + Table oldTable = getHiveTable(tablePath); + newTable.getSd().setLocation(oldTable.getSd().getLocation()); + } + + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), newTable); + } + } catch (TException e) { + throw new CatalogException( + S
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282322682 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogBaseTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateCatalogBaseTable(newCatalogTable); + + try { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + // TODO: [FLINK-12452] alterTable() in all catalogs should ensure existing base table and the new one are of the same type + Table newTable = createHiveTable(tablePath, newCatalogTable); + + // client.alter_table() requires a valid location + // thus, if new table doesn't have that, it reuses location of the old table + if (!newTable.getSd().isSetLocation()) { + Table oldTable = getHiveTable(tablePath); + newTable.getSd().setLocation(oldTable.getSd().getLocation()); + } + + client.alter_table
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282320563 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -363,4 +332,23 @@ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath throw new UnsupportedOperationException(); } + // -- utils -- + + /** +* Filter out Hive-created properties, and return Flink-created properties. +*/ + private Map retrieveFlinkProperties(Map hiveTableParams) { + return hiveTableParams.entrySet().stream() + .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX)) + .collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue())); + } + + /** +* Add a prefix to Flink-created properties to distinguish them from Hive-created properties. +*/ + private Map maskFlinkProperties(Map properties) { Review comment: Static? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282320535 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -363,4 +332,23 @@ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath throw new UnsupportedOperationException(); } + // -- utils -- + + /** +* Filter out Hive-created properties, and return Flink-created properties. +*/ + private Map retrieveFlinkProperties(Map hiveTableParams) { Review comment: static? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282206462 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + Table newTable = createHiveTable(tablePath, newCatalogTable); + + // client.alter_table() requires a valid location + // thus, if new table doesn't have that, it reuses location of the old table + if (!newTable.getSd().isSetLocation()) { + Table oldTable = getHiveTable(tablePath); + newTable.getSd().setLocation(oldTable.getSd().getLocation()); + } + + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), newTable); + } + } catch (TException e) { + throw new CatalogException( + S
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282205173 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { Review comment: Do we need to validate newCatalogTable? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282203125 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +101,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws IllegalArgumentException thrown if the input base table is invalid. +*/ + public abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) Review comment: Use "protected" if we don't expect external callers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282205441 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { Review comment: Also, how do we ensure no alter for table to view and vise versa? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282203601 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +101,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws IllegalArgumentException thrown if the input base table is invalid. +*/ + public abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) + throws IllegalArgumentException; + + /** +* Create a CatalogBaseTable from a Hive table. +* +* @param hiveTable a Hive table +* @return a CatalogBaseTable +*/ + public abstract CatalogBaseTable createCatalogTable(Table hiveTable); + + /** +* Create a Hive table from a CatalogBaseTable. +* +* @param tablePath path of the table +* @param table a CatalogBaseTable +* @return a Hive table +*/ + public abstract Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table); Review comment: Same as above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282203452 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +101,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws IllegalArgumentException thrown if the input base table is invalid. +*/ + public abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) + throws IllegalArgumentException; + + /** +* Create a CatalogBaseTable from a Hive table. +* +* @param hiveTable a Hive table +* @return a CatalogBaseTable +*/ + public abstract CatalogBaseTable createCatalogTable(Table hiveTable); Review comment: Can we use word "convertTo" instead of "create"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282201837 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -363,4 +333,23 @@ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath throw new UnsupportedOperationException(); } + // -- utils -- + + /** +* Filter out Hive-created properties, and return Flink-created properties. +*/ + private Map retrieveFlinkProperties(Map hiveTableParams) { + return hiveTableParams.entrySet().stream() + .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX)) + .collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue())); + } + + /** +* Add a prefix to Flink-created properties to distinguish them from Hive-created properties. +*/ + private Map buildFlinkProperties(Map properties) { Review comment: Maybe we can use words "mask" and "unmask" instead of "build" and "retrieve". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282199916 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -95,144 +106,103 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno // -- tables and views-- @Override - public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - try { - client.dropTable( - tablePath.getDatabaseName(), - tablePath.getObjectName(), - // Indicate whether associated data should be deleted. - // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary - true, - ignoreIfNotExists); - } catch (NoSuchObjectException e) { - if (!ignoreIfNotExists) { - throw new TableNotExistException(catalogName, tablePath); - } - } catch (TException e) { - throw new CatalogException( - String.format("Failed to drop table %s", tablePath.getFullName()), e); + public void validateCatalogBaseTable(CatalogBaseTable table) throws IllegalArgumentException { + // TODO: invalidate HiveCatalogView + if (table instanceof HiveCatalogTable) { + throw new IllegalArgumentException( + "Please use HiveCatalog to operate on HiveCatalogTable and HiveCatalogView."); } } @Override - public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException, CatalogException { - try { - // alter_table() doesn't throw a clear exception when target table doesn't exist. Thus, check the table existence explicitly - if (tableExists(tablePath)) { - ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); - // alter_table() doesn't throw a clear exception when new table already exists. Thus, check the table existence explicitly - if (tableExists(newPath)) { - throw new TableAlreadyExistException(catalogName, newPath); - } else { - Table table = getHiveTable(tablePath); - table.setTableName(newTableName); - client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); - } - } else if (!ignoreIfNotExists) { - throw new TableNotExistException(catalogName, tablePath); - } - } catch (TException e) { - throw new CatalogException( - String.format("Failed to rename table %s", tablePath.getFullName()), e); - } - } + public CatalogBaseTable createCatalogTable(Table hiveTable) { + // Table schema + TableSchema tableSchema = createTableSchema( + hiveTable.getSd().getCols(), hiveTable.getPartitionKeys()); - @Override - public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) - throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { - if (!databaseExists(tablePath.getDatabaseName())) { - throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); - } else { - try { - client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath, table)); - } catch (AlreadyExistsException e) { - if (!ignoreIfExists) { - throw new TableAlreadyExistException(catalogName, tablePath); - } - } catch (TException e) { - throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); - } + // Table properties + Map properties =
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282198787 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -95,144 +106,103 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno // -- tables and views-- @Override - public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - try { - client.dropTable( - tablePath.getDatabaseName(), - tablePath.getObjectName(), - // Indicate whether associated data should be deleted. - // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary - true, - ignoreIfNotExists); - } catch (NoSuchObjectException e) { - if (!ignoreIfNotExists) { - throw new TableNotExistException(catalogName, tablePath); - } - } catch (TException e) { - throw new CatalogException( - String.format("Failed to drop table %s", tablePath.getFullName()), e); + public void validateCatalogBaseTable(CatalogBaseTable table) throws IllegalArgumentException { + // TODO: invalidate HiveCatalogView + if (table instanceof HiveCatalogTable) { Review comment: I'm wondering if we should specify what we are expecting and throw exceptions if condition isn't met. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r281861009 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -85,49 +86,39 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno // -- tables and views-- - @Override - public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - @Override - public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException, CatalogException { - throw new UnsupportedOperationException(); - } - @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - @Override - public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } + validateHiveCatalogTable(table); - @Override - public List listTables(String databaseName) - throws DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); + createHiveTable( + tablePath, + HiveCatalogUtil.createHiveTable(tablePath, table), + ignoreIfExists); } @Override - public List listViews(String databaseName) throws DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateHiveCatalogTable(newTable); + + super.alterTable(tablePath, newTable, ignoreIfNotExists); } @Override - public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + public CatalogBaseTable getTable(ObjectPath tablePath) Review comment: Similar comment here. I don't see we need getTable() here. getTable() is the same for both catalogs, and the only diff is the translation, which can be captured in a new API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r281860485 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -85,49 +86,39 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno // -- tables and views-- - @Override - public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - @Override - public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException, CatalogException { - throw new UnsupportedOperationException(); - } - @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - @Override - public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } + validateHiveCatalogTable(table); - @Override - public List listTables(String databaseName) - throws DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); + createHiveTable( + tablePath, + HiveCatalogUtil.createHiveTable(tablePath, table), + ignoreIfExists); } @Override - public List listViews(String databaseName) throws DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateHiveCatalogTable(newTable); Review comment: Similar to createTable(), I don't see we need to provide implementations for alterTable() in each subclass because I don't really see much difference. If validation is different, then we can define a new interface in the base and have each subclass implement it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r281860069 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -85,49 +86,39 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno // -- tables and views-- - @Override - public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - @Override - public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException, CatalogException { - throw new UnsupportedOperationException(); - } - @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) Review comment: I think we can keep the implementation only in HiveCatalogBase and define a new API called createHiveTable(). Each subcalss only needs to implement createHiveTable() (because they may create in different ways). I don't see why we need HiveCatalogUtil and related classes for this. This might be cleaner. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r281857308 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveCatalogUtil.java ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive.util; + +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.hive.HiveCatalogDatabase; +import org.apache.flink.table.catalog.hive.HiveCatalogTable; +import org.apache.flink.table.catalog.hive.HiveTableConfig; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + + +/** + * Utils to convert meta objects between Flink and Hive for HiveCatalog. + */ +public class HiveCatalogUtil extends HiveCatalogBaseUtil { + + private HiveCatalogUtil() { + } + + // -- Utils -- Review comment: Nit: The class name is called "utils", and is this line still necessary? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r281856850 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java ## @@ -19,11 +19,15 @@ package org.apache.flink.table.catalog.hive; /** - * Configs for Flink tables stored in Hive metastore. + * Configs for tables in Hive metastore. */ public class HiveTableConfig { - // Description of the Flink table + // --- Review comment: Do the three lines (26-28) provide any additional info above the class name here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r281856367 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.plan.stats.TableStats; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A Hive catalog table implementation. + */ +public class HiveCatalogTable implements CatalogTable { + // Schema of the table (column names and types) + private final TableSchema tableSchema; + // Partition keys if this is a partitioned table. It's an empty set if the table is not partitioned + private final List partitionKeys; + // Properties of the table + private final Map properties; + // Comment of the table + private String comment = "This is a hive catalog table."; + + public HiveCatalogTable( + TableSchema tableSchema, + List partitionKeys, + Map properties, + String comment) { + this.tableSchema = checkNotNull(tableSchema, "tableSchema cannot be null"); + this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot be null"); + this.properties = checkNotNull(properties, "properties cannot be null"); + this.comment = comment; + } + + public HiveCatalogTable( + TableSchema tableSchema, + Map properties, + String comment) { + this(tableSchema, new ArrayList<>(), properties, comment); + } + + @Override + public TableStats getStatistics() { + return new TableStats(0); + } + + @Override + public boolean isPartitioned() { + return !partitionKeys.isEmpty(); + } + + @Override + public List getPartitionKeys() { + return partitionKeys; + } + + @Override + public Map getProperties() { + return properties; + } + + @Override + public TableSchema getSchema() { + return tableSchema; + } + + @Override + public String getComment() { + return comment; + } + + @Override + public CatalogBaseTable copy() { + return new HiveCatalogTable( + tableSchema, new ArrayList<>(partitionKeys), new HashMap<>(properties), comment); Review comment: Don't we need to copy tableSchema? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r281854488 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +204,140 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + protected void createHiveTable(ObjectPath tablePath, Table table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(table); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + protected Table getHiveTable(ObjectPath tablePath) throws TableNotExistException { + try { + return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName()); + } catch (NoSuchObjectException e) { + throw new TableNotExistException(catalogName, tablePath); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + // IMetastoreClient.alter_table() requires the table to have a valid location, which it doesn't in this case + // Thus we have to translate alterTable() into (dropTable() + createTable()) + dropTable(tablePath, false); Review comment: We also need to note that view and table are sharing this call, so it probably doesn't make sense to change a view to a regular table and vise versa. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service,
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r281854134 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +204,140 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + protected void createHiveTable(ObjectPath tablePath, Table table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(table); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + protected Table getHiveTable(ObjectPath tablePath) throws TableNotExistException { + try { + return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName()); + } catch (NoSuchObjectException e) { + throw new TableNotExistException(catalogName, tablePath); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + // IMetastoreClient.alter_table() requires the table to have a valid location, which it doesn't in this case + // Thus we have to translate alterTable() into (dropTable() + createTable()) + dropTable(tablePath, false); Review comment: I think we needs to be very careful here. dropTable() might also drop the data, but alterTable() really is supposed to change the metadata only. We should probably solve the location problem rather than taking this workaround. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub a
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r281853049 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -85,49 +86,39 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno // -- tables and views-- - @Override - public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - @Override - public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException, CatalogException { - throw new UnsupportedOperationException(); - } - @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - @Override - public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } + validateHiveCatalogTable(table); - @Override - public List listTables(String databaseName) - throws DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); + createHiveTable( + tablePath, + HiveCatalogUtil.createHiveTable(tablePath, table), + ignoreIfExists); } @Override - public List listViews(String databaseName) throws DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateHiveCatalogTable(newTable); Review comment: Wouldn't we want to do similar validation for GenericHiveMetastoreCatalog? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services