[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-13 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r283461610
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +96,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf 
hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws CatalogException thrown if the input base table is invalid.
+*/
+   protected abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
+   throws CatalogException;
+
+   /**
+* Create a CatalogBaseTable from a Hive table.
+*
+* @param hiveTable a Hive table
+* @return a CatalogBaseTable
+*/
+   protected abstract CatalogBaseTable createCatalogBaseTable(Table 
hiveTable);
 
 Review comment:
   I think we can just name the method createCatalogTable() which sounds more 
general, and as a direct counterpart of createHiveTable(). However, this is 
very minor though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-13 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r283457038
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +101,34 @@ private static IMetaStoreClient 
getMetastoreClient(HiveConf hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws IllegalArgumentException thrown if the input base table is 
invalid.
+*/
+   protected abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
 
 Review comment:
   If you're concerned about giving user precise, useful information, throwing 
an exception isn't the best choice. For that, logging is probably better. 
Neverthelesss, CatalogException certainly sounds better, though it doesn't 
change the nature of the problem. Let's do this way for now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-13 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r283454112
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   try {
 
 Review comment:
   Yeah. While inmemory is currently doing that, it's achievable, which might 
not be the case for Hive. This is no consistency required among external 
catalogs. Nevertheless, I think we should if check if Hive allows it. It's fine 
to do this in a separate JIRA.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-09 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282712487
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogBaseTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   validateCatalogBaseTable(newCatalogTable);
+
+   try {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new 
TableNotExistException(catalogName, tablePath);
+   }
+   } else {
+   // TODO: [FLINK-12452] alterTable() in all 
catalogs should ensure existing base table and the new one are of the same type
+   Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+   // client.alter_table() requires a valid 
location
+   // thus, if new table doesn't have that, it 
reuses location of the old table
+   if (!newTable.getSd().isSetLocation()) {
+   Table oldTable = 
getHiveTable(tablePath);
+   
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+   }
+
+   client.alter_table

[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-09 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282711429
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -363,4 +332,23 @@ public CatalogColumnStatistics 
getPartitionColumnStatistics(ObjectPath tablePath
throw new UnsupportedOperationException();
}
 
+   // -- utils --
+
+   /**
+* Filter out Hive-created properties, and return Flink-created 
properties.
+*/
+   private Map retrieveFlinkProperties(Map 
hiveTableParams) {
 
 Review comment:
   The difference is that a static method doesn't require an instance of the 
class in order to invoke it. For example, if another static method of this 
class needs to call this method, without this one defined as static, one has to 
create an instance of the class to invoke the method. The general principle is 
that if a method doesn't access any of the object variables/methods, make it 
static.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-09 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282710918
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +101,34 @@ private static IMetaStoreClient 
getMetastoreClient(HiveConf hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws IllegalArgumentException thrown if the input base table is 
invalid.
+*/
+   protected abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
 
 Review comment:
   I'm not backtracking what I said earlier. It's fine to defined an API, but 
the API needs to be well-thought, which doesn't seem to be the case here. 
That's why I said we can forgo the API. If we think an API is the way to go, 
then let's define the API better and parent class shouldn't define what 
validation a subclass might do. Personally, I don't feel the API is 
sufficiently sound, but feel free to get 2nd opinion though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282322682
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogBaseTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   validateCatalogBaseTable(newCatalogTable);
+
+   try {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new 
TableNotExistException(catalogName, tablePath);
+   }
+   } else {
+   // TODO: [FLINK-12452] alterTable() in all 
catalogs should ensure existing base table and the new one are of the same type
+   Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+   // client.alter_table() requires a valid 
location
+   // thus, if new table doesn't have that, it 
reuses location of the old table
+   if (!newTable.getSd().isSetLocation()) {
+   Table oldTable = 
getHiveTable(tablePath);
+   
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+   }
+
+   client.alter_table

[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282321982
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +101,34 @@ private static IMetaStoreClient 
getMetastoreClient(HiveConf hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws IllegalArgumentException thrown if the input base table is 
invalid.
+*/
+   protected abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
 
 Review comment:
   Even though this is not a public API, I feel it should be more formal. 
Specifically, we should probably can let this return true or false for 
validation result, rather than relying on throwing IllegalArgumentException. 
Secondly, validation can be more more complicated than just check the instance 
type. At this level, it shouldn't care what kind of validation the sub classes 
might do.
   
   I think it's acceptable if the base class doesn't care if the sub class does 
validation or how it does it. So it's okay if we remove this and let each 
subclass do its validation when create/alter table. This way might be even 
cleaner.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282322716
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   try {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new 
TableNotExistException(catalogName, tablePath);
+   }
+   } else {
+   Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+   // client.alter_table() requires a valid 
location
+   // thus, if new table doesn't have that, it 
reuses location of the old table
+   if (!newTable.getSd().isSetLocation()) {
+   Table oldTable = 
getHiveTable(tablePath);
+   
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+   }
+
+   client.alter_table(tablePath.getDatabaseName(), 
tablePath.getObjectName(), newTable);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   S

[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282322682
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogBaseTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   validateCatalogBaseTable(newCatalogTable);
+
+   try {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new 
TableNotExistException(catalogName, tablePath);
+   }
+   } else {
+   // TODO: [FLINK-12452] alterTable() in all 
catalogs should ensure existing base table and the new one are of the same type
+   Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+   // client.alter_table() requires a valid 
location
+   // thus, if new table doesn't have that, it 
reuses location of the old table
+   if (!newTable.getSd().isSetLocation()) {
+   Table oldTable = 
getHiveTable(tablePath);
+   
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+   }
+
+   client.alter_table

[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282320563
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -363,4 +332,23 @@ public CatalogColumnStatistics 
getPartitionColumnStatistics(ObjectPath tablePath
throw new UnsupportedOperationException();
}
 
+   // -- utils --
+
+   /**
+* Filter out Hive-created properties, and return Flink-created 
properties.
+*/
+   private Map retrieveFlinkProperties(Map 
hiveTableParams) {
+   return hiveTableParams.entrySet().stream()
+   .filter(e -> 
e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+   .collect(Collectors.toMap(e -> 
e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
+   }
+
+   /**
+* Add a prefix to Flink-created properties to distinguish them from 
Hive-created properties.
+*/
+   private Map maskFlinkProperties(Map 
properties) {
 
 Review comment:
   Static?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282320535
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -363,4 +332,23 @@ public CatalogColumnStatistics 
getPartitionColumnStatistics(ObjectPath tablePath
throw new UnsupportedOperationException();
}
 
+   // -- utils --
+
+   /**
+* Filter out Hive-created properties, and return Flink-created 
properties.
+*/
+   private Map retrieveFlinkProperties(Map 
hiveTableParams) {
 
 Review comment:
   static?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282206462
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   try {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new 
TableNotExistException(catalogName, tablePath);
+   }
+   } else {
+   Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+   // client.alter_table() requires a valid 
location
+   // thus, if new table doesn't have that, it 
reuses location of the old table
+   if (!newTable.getSd().isSetLocation()) {
+   Table oldTable = 
getHiveTable(tablePath);
+   
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+   }
+
+   client.alter_table(tablePath.getDatabaseName(), 
tablePath.getObjectName(), newTable);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   S

[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282205173
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   try {
 
 Review comment:
   Do we need to validate newCatalogTable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282203125
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +101,34 @@ private static IMetaStoreClient 
getMetastoreClient(HiveConf hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws IllegalArgumentException thrown if the input base table is 
invalid.
+*/
+   public abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
 
 Review comment:
   Use "protected" if we don't expect external callers.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282205441
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   try {
 
 Review comment:
   Also, how do we ensure no alter for table to view and vise versa?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282203601
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +101,34 @@ private static IMetaStoreClient 
getMetastoreClient(HiveConf hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws IllegalArgumentException thrown if the input base table is 
invalid.
+*/
+   public abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
+   throws IllegalArgumentException;
+
+   /**
+* Create a CatalogBaseTable from a Hive table.
+*
+* @param hiveTable a Hive table
+* @return a CatalogBaseTable
+*/
+   public abstract CatalogBaseTable createCatalogTable(Table hiveTable);
+
+   /**
+* Create a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table a CatalogBaseTable
+* @return a Hive table
+*/
+   public abstract Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table);
 
 Review comment:
   Same as above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282203452
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +101,34 @@ private static IMetaStoreClient 
getMetastoreClient(HiveConf hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws IllegalArgumentException thrown if the input base table is 
invalid.
+*/
+   public abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
+   throws IllegalArgumentException;
+
+   /**
+* Create a CatalogBaseTable from a Hive table.
+*
+* @param hiveTable a Hive table
+* @return a CatalogBaseTable
+*/
+   public abstract CatalogBaseTable createCatalogTable(Table hiveTable);
 
 Review comment:
   Can we use word "convertTo" instead of "create"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282201837
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -363,4 +333,23 @@ public CatalogColumnStatistics 
getPartitionColumnStatistics(ObjectPath tablePath
throw new UnsupportedOperationException();
}
 
+   // -- utils --
+
+   /**
+* Filter out Hive-created properties, and return Flink-created 
properties.
+*/
+   private Map retrieveFlinkProperties(Map 
hiveTableParams) {
+   return hiveTableParams.entrySet().stream()
+   .filter(e -> 
e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+   .collect(Collectors.toMap(e -> 
e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
+   }
+
+   /**
+* Add a prefix to Flink-created properties to distinguish them from 
Hive-created properties.
+*/
+   private Map buildFlinkProperties(Map 
properties) {
 
 Review comment:
   Maybe we can use words "mask" and "unmask" instead of "build" and "retrieve".


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282199916
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -95,144 +106,103 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
// -- tables and views--
 
@Override
-   public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   try {
-   client.dropTable(
-   tablePath.getDatabaseName(),
-   tablePath.getObjectName(),
-   // Indicate whether associated data should be 
deleted.
-   // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
-   true,
-   ignoreIfNotExists);
-   } catch (NoSuchObjectException e) {
-   if (!ignoreIfNotExists) {
-   throw new TableNotExistException(catalogName, 
tablePath);
-   }
-   } catch (TException e) {
-   throw new CatalogException(
-   String.format("Failed to drop table %s", 
tablePath.getFullName()), e);
+   public void validateCatalogBaseTable(CatalogBaseTable table) throws 
IllegalArgumentException {
+   // TODO: invalidate HiveCatalogView
+   if (table instanceof HiveCatalogTable) {
+   throw new IllegalArgumentException(
+   "Please use HiveCatalog to operate on 
HiveCatalogTable and HiveCatalogView.");
}
}
 
@Override
-   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
-   try {
-   // alter_table() doesn't throw a clear exception when 
target table doesn't exist. Thus, check the table existence explicitly
-   if (tableExists(tablePath)) {
-   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
-   // alter_table() doesn't throw a clear 
exception when new table already exists. Thus, check the table existence 
explicitly
-   if (tableExists(newPath)) {
-   throw new 
TableAlreadyExistException(catalogName, newPath);
-   } else {
-   Table table = getHiveTable(tablePath);
-   table.setTableName(newTableName);
-   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
-   }
-   } else if (!ignoreIfNotExists) {
-   throw new TableNotExistException(catalogName, 
tablePath);
-   }
-   } catch (TException e) {
-   throw new CatalogException(
-   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
-   }
-   }
+   public CatalogBaseTable createCatalogTable(Table hiveTable) {
+   // Table schema
+   TableSchema tableSchema = createTableSchema(
+   hiveTable.getSd().getCols(), 
hiveTable.getPartitionKeys());
 
-   @Override
-   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
-   if (!databaseExists(tablePath.getDatabaseName())) {
-   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
-   } else {
-   try {
-   
client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath, 
table));
-   } catch (AlreadyExistsException e) {
-   if (!ignoreIfExists) {
-   throw new 
TableAlreadyExistException(catalogName, tablePath);
-   }
-   } catch (TException e) {
-   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
-   }
+   // Table properties
+   Map properties = 

[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282198787
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -95,144 +106,103 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
// -- tables and views--
 
@Override
-   public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   try {
-   client.dropTable(
-   tablePath.getDatabaseName(),
-   tablePath.getObjectName(),
-   // Indicate whether associated data should be 
deleted.
-   // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
-   true,
-   ignoreIfNotExists);
-   } catch (NoSuchObjectException e) {
-   if (!ignoreIfNotExists) {
-   throw new TableNotExistException(catalogName, 
tablePath);
-   }
-   } catch (TException e) {
-   throw new CatalogException(
-   String.format("Failed to drop table %s", 
tablePath.getFullName()), e);
+   public void validateCatalogBaseTable(CatalogBaseTable table) throws 
IllegalArgumentException {
+   // TODO: invalidate HiveCatalogView
+   if (table instanceof HiveCatalogTable) {
 
 Review comment:
   I'm wondering if we should specify what we are expecting and throw 
exceptions if condition isn't met.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281861009
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -85,49 +86,39 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
 
// -- tables and views--
 
-   @Override
-   public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
 
-   @Override
-   public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
+   validateHiveCatalogTable(table);
 
-   @Override
-   public List listTables(String databaseName)
-   throws DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   createHiveTable(
+   tablePath,
+   HiveCatalogUtil.createHiveTable(tablePath, table),
+   ignoreIfExists);
}
 
@Override
-   public List listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   validateHiveCatalogTable(newTable);
+
+   super.alterTable(tablePath, newTable, ignoreIfNotExists);
}
 
@Override
-   public CatalogBaseTable getTable(ObjectPath objectPath) throws 
TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   public CatalogBaseTable getTable(ObjectPath tablePath)
 
 Review comment:
   Similar comment here. I don't see we need getTable() here. getTable() is the 
same for both catalogs, and the only diff is the translation, which can be 
captured in a new API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281860485
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -85,49 +86,39 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
 
// -- tables and views--
 
-   @Override
-   public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
 
-   @Override
-   public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
+   validateHiveCatalogTable(table);
 
-   @Override
-   public List listTables(String databaseName)
-   throws DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   createHiveTable(
+   tablePath,
+   HiveCatalogUtil.createHiveTable(tablePath, table),
+   ignoreIfExists);
}
 
@Override
-   public List listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   validateHiveCatalogTable(newTable);
 
 Review comment:
   Similar to createTable(), I don't see we need to provide implementations for 
alterTable() in each subclass because I don't really see much difference. If 
validation is different, then we can define a new interface in the base and 
have each subclass implement it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281860069
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -85,49 +86,39 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
 
// -- tables and views--
 
-   @Override
-   public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
 
 Review comment:
   I think we can keep the implementation only in HiveCatalogBase and define a 
new API called createHiveTable(). Each subcalss only needs to implement 
createHiveTable() (because they may create in different ways). I don't see why 
we need HiveCatalogUtil and related classes for this.
   
   This might be cleaner.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281857308
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveCatalogUtil.java
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive.util;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.HiveCatalogDatabase;
+import org.apache.flink.table.catalog.hive.HiveCatalogTable;
+import org.apache.flink.table.catalog.hive.HiveTableConfig;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+
+/**
+ * Utils to convert meta objects between Flink and Hive for HiveCatalog.
+ */
+public class HiveCatalogUtil extends HiveCatalogBaseUtil {
+
+   private HiveCatalogUtil() {
+   }
+
+   // -- Utils --
 
 Review comment:
   Nit: The class name is called "utils", and is this line still necessary?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281856850
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
 ##
 @@ -19,11 +19,15 @@
 package org.apache.flink.table.catalog.hive;
 
 /**
- * Configs for Flink tables stored in Hive metastore.
+ * Configs for tables in Hive metastore.
  */
 public class HiveTableConfig {
 
-   // Description of the Flink table
+   // ---
 
 Review comment:
   Do the three lines (26-28) provide any additional info above the class name 
here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281856367
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A Hive catalog table implementation.
+ */
+public class HiveCatalogTable implements CatalogTable {
+   // Schema of the table (column names and types)
+   private final TableSchema tableSchema;
+   // Partition keys if this is a partitioned table. It's an empty set if 
the table is not partitioned
+   private final List partitionKeys;
+   // Properties of the table
+   private final Map properties;
+   // Comment of the table
+   private String comment = "This is a hive catalog table.";
+
+   public HiveCatalogTable(
+   TableSchema tableSchema,
+   List partitionKeys,
+   Map properties,
+   String comment) {
+   this.tableSchema = checkNotNull(tableSchema, "tableSchema 
cannot be null");
+   this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys 
cannot be null");
+   this.properties = checkNotNull(properties, "properties cannot 
be null");
+   this.comment = comment;
+   }
+
+   public HiveCatalogTable(
+   TableSchema tableSchema,
+   Map properties,
+   String comment) {
+   this(tableSchema, new ArrayList<>(), properties, comment);
+   }
+
+   @Override
+   public TableStats getStatistics() {
+   return new TableStats(0);
+   }
+
+   @Override
+   public boolean isPartitioned() {
+   return !partitionKeys.isEmpty();
+   }
+
+   @Override
+   public List getPartitionKeys() {
+   return partitionKeys;
+   }
+
+   @Override
+   public Map getProperties() {
+   return properties;
+   }
+
+   @Override
+   public TableSchema getSchema() {
+   return tableSchema;
+   }
+
+   @Override
+   public String getComment() {
+   return comment;
+   }
+
+   @Override
+   public CatalogBaseTable copy() {
+   return new HiveCatalogTable(
+   tableSchema, new ArrayList<>(partitionKeys), new 
HashMap<>(properties), comment);
 
 Review comment:
   Don't we need to copy tableSchema?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281854488
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +204,140 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   protected void createHiveTable(ObjectPath tablePath, Table table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(table);
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist. Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists. Thus, check the table existence 
explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   protected Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
+   try {
+   return client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+   } catch (NoSuchObjectException e) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to get table %s from Hive 
metastore", tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } else {
+   // IMetastoreClient.alter_table() requires the table to 
have a valid location, which it doesn't in this case
+   // Thus we have to translate alterTable() into 
(dropTable() + createTable())
+   dropTable(tablePath, false);
 
 Review comment:
   We also need to note that view and table are sharing this call, so it 
probably doesn't make sense to change a view to a regular table and vise versa.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service,

[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281854134
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +204,140 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   protected void createHiveTable(ObjectPath tablePath, Table table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(table);
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist. Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists. Thus, check the table existence 
explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   protected Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
+   try {
+   return client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+   } catch (NoSuchObjectException e) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to get table %s from Hive 
metastore", tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } else {
+   // IMetastoreClient.alter_table() requires the table to 
have a valid location, which it doesn't in this case
+   // Thus we have to translate alterTable() into 
(dropTable() + createTable())
+   dropTable(tablePath, false);
 
 Review comment:
   I think we needs to be very careful here. dropTable() might also drop the 
data, but alterTable() really is supposed to change the metadata only.
   
   We should probably solve the location problem rather than taking this 
workaround.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub a

[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281853049
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -85,49 +86,39 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
 
// -- tables and views--
 
-   @Override
-   public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
 
-   @Override
-   public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
+   validateHiveCatalogTable(table);
 
-   @Override
-   public List listTables(String databaseName)
-   throws DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   createHiveTable(
+   tablePath,
+   HiveCatalogUtil.createHiveTable(tablePath, table),
+   ignoreIfExists);
}
 
@Override
-   public List listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   validateHiveCatalogTable(newTable);
 
 Review comment:
   Wouldn't we want to do similar validation for GenericHiveMetastoreCatalog?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services