This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 39d8236  [FLINK-12238][hive] Support database related operations in 
GenericHiveMetastoreCatalog and setup flink-connector-hive module
39d8236 is described below

commit 39d82368eca3891d27c85dd9bc56db2344ee73ba
Author: Bowen L <bowenl...@gmail.com>
AuthorDate: Tue Apr 30 01:19:39 2019 -0700

    [FLINK-12238][hive] Support database related operations in 
GenericHiveMetastoreCatalog and setup flink-connector-hive module
    
    This closes #8205
---
 flink-connectors/flink-connector-hive/pom.xml      | 429 +++++++++++++++++++++
 .../catalog/hive/GenericHiveMetastoreCatalog.java  | 352 +++++++++++++++++
 .../hive/GenericHiveMetastoreCatalogUtil.java      |  49 +++
 .../src/main/resources/META-INF/NOTICE             |  26 ++
 .../main/resources/META-INF/licenses/LICENSE.antlr |  38 ++
 .../hive/GenericHiveMetastoreCatalogTest.java      |  83 ++++
 .../flink/table/catalog/hive/HiveTestUtils.java    |  54 +++
 .../src/test/resources/hive-site.xml               |  42 ++
 .../src/test/resources/log4j-test.properties       |  24 ++
 flink-connectors/pom.xml                           |   1 +
 flink-table/flink-table-api-java/pom.xml           |  10 +
 .../table/catalog/GenericCatalogDatabase.java      |   2 +-
 .../table/catalog/GenericInMemoryCatalogTest.java  | 248 +++---------
 .../flink/table/catalog/CatalogTestBase.java       | 241 ++++++++++++
 14 files changed, 1396 insertions(+), 203 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml 
b/flink-connectors/flink-connector-hive/pom.xml
new file mode 100644
index 0000000..cb09934
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -0,0 +1,429 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.9-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
+       <name>flink-connector-hive</name>
+
+       <packaging>jar</packaging>
+
+       <properties>
+               <hive.version>2.3.4</hive.version>
+               
<hivemetastore.hadoop.version>2.7.2</hivemetastore.hadoop.version>
+       </properties>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-common</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-api-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Hadoop dependency -->
+               <!-- Hadoop as provided dependencies, so we can depend on them 
without pulling in Hadoop -->
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-common</artifactId>
+                       <version>${hivemetastore.hadoop.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-mapreduce-client-core</artifactId>
+                       <version>${hivemetastore.hadoop.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Hive dependencies -->
+               <!-- Note: Hive published jars do not have proper dependencies 
declared.
+               We need to push for HIVE-16391 
(https://issues.apache.org/jira/browse/HIVE-16391) to resolve this problem. -->
+
+               <dependency>
+                       <groupId>org.apache.hive</groupId>
+                       <artifactId>hive-metastore</artifactId>
+                       <version>${hive.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.hive</groupId>
+                                       <artifactId>hive-shims</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>javolution</groupId>
+                                       <artifactId>javolution</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.google.guava</groupId>
+                                       <artifactId>guava</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.google.protobuf</groupId>
+                                       <artifactId>protobuf-java</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.derby</groupId>
+                                       <artifactId>derby</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.hbase</groupId>
+                                       <artifactId>hbase-client</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>commons-lang</groupId>
+                                       <artifactId>commons-lang</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.zaxxer</groupId>
+                                       <artifactId>HikariCP</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>javax.jdo</groupId>
+                                       <artifactId>jdo-api</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>co.cask.tephra</groupId>
+                                       <artifactId>tephra-api</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>co.cask.tephra</groupId>
+                                       <artifactId>tephra-core</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>co.cask.tephra</groupId>
+                                       
<artifactId>tephra-hbase-compat-1.0</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>commons-cli</groupId>
+                                       <artifactId>commons-cli</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.thrift</groupId>
+                                       <artifactId>libfb303</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>javax.transaction</groupId>
+                                       <artifactId>transaction-api</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.orc</groupId>
+                                       <artifactId>orc-core</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>joda-time</groupId>
+                                       <artifactId>joda-time</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       
<groupId>org.apache.logging.log4j</groupId>
+                                       <artifactId>log4j-1.2-api</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       
<groupId>org.apache.logging.log4j</groupId>
+                                       
<artifactId>log4j-slf4j-impl</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.ant</groupId>
+                                       <artifactId>ant</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.tdunning</groupId>
+                                       <artifactId>json</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>jline</groupId>
+                                       <artifactId>jline</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       
<groupId>org.eclipse.jetty.aggregate</groupId>
+                                       <artifactId>jetty-all</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       
<groupId>org.eclipse.jetty.orbit</groupId>
+                                       <artifactId>javax.servlet</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       
<groupId>org.apache.logging.log4j</groupId>
+                                       <artifactId>log4j-web</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>io.dropwizard.metrics</groupId>
+                                       <artifactId>metrics-core</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>io.dropwizard.metrics</groupId>
+                                       <artifactId>metrics-jvm</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>io.dropwizard.metrics</groupId>
+                                       <artifactId>metrics-json</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       
<groupId>com.fasterxml.jackson.core</groupId>
+                                       
<artifactId>jackson-databind</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.github.joshelser</groupId>
+                                       
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
+                               </exclusion>
+
+                               <!-- org.apache.hive:hive-service-rpc -->
+                               <exclusion>
+                                       <groupId>tomcat</groupId>
+                                       <artifactId>jasper-compiler</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>tomcat</groupId>
+                                       <artifactId>jasper-runtime</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       
<groupId>org.apache.httpcomponents</groupId>
+                                       <artifactId>httpclient</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       
<groupId>org.apache.httpcomponents</groupId>
+                                       <artifactId>httpcore</artifactId>
+                               </exclusion>
+
+                               <!-- org.apache.hive:hive-serde -->
+                               <exclusion>
+                                       <groupId>commons-codec</groupId>
+                                       <artifactId>commons-codec</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.avro</groupId>
+                                       <artifactId>avro</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>net.sf.opencsv</groupId>
+                                       <artifactId>opencsv</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.parquet</groupId>
+                                       
<artifactId>parquet-hadoop-bundle</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.hive</groupId>
+                       <artifactId>hive-exec</artifactId>
+                       <version>${hive.version}</version>
+                       <scope>test</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.hive</groupId>
+                                       
<artifactId>hive-vector-code-gen</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.hive</groupId>
+                                       <artifactId>hive-llap-tez</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.hive</groupId>
+                                       <artifactId>hive-shims</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>commons-codec</groupId>
+                                       <artifactId>commons-codec</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>commons-httpclient</groupId>
+                                       
<artifactId>commons-httpclient</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       
<groupId>org.apache.logging.log4j</groupId>
+                                       <artifactId>log4j-1.2-api</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       
<groupId>org.apache.logging.log4j</groupId>
+                                       
<artifactId>log4j-slf4j-impl</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.antlr</groupId>
+                                       <artifactId>antlr-runtime</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.antlr</groupId>
+                                       <artifactId>ST4</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.ant</groupId>
+                                       <artifactId>ant</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.commons</groupId>
+                                       
<artifactId>commons-compress</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.ivy</groupId>
+                                       <artifactId>ivy</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.zookeeper</groupId>
+                                       <artifactId>zookeeper</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.curator</groupId>
+                                       <artifactId>apache-curator</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.curator</groupId>
+                                       
<artifactId>curator-framework</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.codehaus.groovy</groupId>
+                                       <artifactId>groovy-all</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.calcite</groupId>
+                                       <artifactId>calcite-core</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.calcite</groupId>
+                                       <artifactId>calcite-druid</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       
<groupId>org.apache.calcite.avatica</groupId>
+                                       <artifactId>avatica</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.google.code.gson</groupId>
+                                       <artifactId>gson</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>stax</groupId>
+                                       <artifactId>stax-api</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.derby</groupId>
+                       <artifactId>derby</artifactId>
+                       <version>10.10.2.0</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-common</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<shadeTestJar>false</shadeTestJar>
+                                                       <artifactSet>
+                                                               <!-- Needs 
end-to-end tests to ensure the built flink-connector-hive jar contains all 
required dependencies and can run -->
+                                                               <includes>
+                                                                       
<include>commons-dbcp:commons-dbcp</include>
+                                                                       
<include>commons-pool:commons-pool</include>
+                                                                       
<include>commons-beanutils:commons-beanutils</include>
+                                                                       
<include>com.jolbox:bonecp</include>
+                                                                       
<include>org.apache.hive:*</include>
+                                                                       
<include>org.apache.thrift:libthrift</include>
+                                                                       
<include>org.datanucleus:*</include>
+                                                                       
<include>org.antlr:antlr-runtime</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                                       
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+                                                       <!-- DO NOT RELOCATE 
GUAVA IN THIS PACKAGE -->
+                                                       <filters>
+                                                               <filter>
+                                                                       <!-- 
some dependencies bring their own LICENSE.txt which we don't need -->
+                                                                       
<artifact>*:*</artifact>
+                                                                       
<excludes>
+                                                                               
<exclude>META-INF/LICENSE.txt</exclude>
+                                                                               
<exclude>META-INF/ASM_LICENSE.txt</exclude>
+                                                                               
<exclude>META-INF/ASL2.0</exclude>
+                                                                       
</excludes>
+                                                               </filter>
+                                                       </filters>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!-- Configure derby.log of embedded Hive metastore for 
unit tests -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       <forkCount>1</forkCount>
+                                       <reuseForks>false</reuseForks>
+                                       <systemPropertyVariables>
+                                               
<derby.stream.error.file>${project.build.directory}/derby.log</derby.stream.error.file>
+                                       </systemPropertyVariables>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+</project>
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
new file mode 100644
index 0000000..4c07938
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ReadableWritableCatalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A catalog that persists all Flink streaming and batch metadata by using 
Hive metastore as a persistent storage.
+ */
+public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
+       private static final Logger LOG = 
LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class);
+
+       public static final String DEFAULT_DB = "default";
+
+       private final String catalogName;
+       private final HiveConf hiveConf;
+
+       private String currentDatabase = DEFAULT_DB;
+       private IMetaStoreClient client;
+
+       public GenericHiveMetastoreCatalog(String catalogName, String 
hivemetastoreURI) {
+               this(catalogName, getHiveConf(hivemetastoreURI));
+       }
+
+       public GenericHiveMetastoreCatalog(String catalogName, HiveConf 
hiveConf) {
+               checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+               this.catalogName = catalogName;
+
+               this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
+               LOG.info("Created GenericHiveMetastoreCatalog '{}'", 
catalogName);
+       }
+
+       private static HiveConf getHiveConf(String hiveMetastoreURI) {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+               HiveConf hiveConf = new HiveConf();
+               hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+               return hiveConf;
+       }
+
+       private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+               try {
+                       return RetryingMetaStoreClient.getProxy(
+                               hiveConf,
+                               null,
+                               null,
+                               HiveMetaStoreClient.class.getName(),
+                               true);
+               } catch (MetaException e) {
+                       throw new CatalogException("Failed to create Hive 
metastore client", e);
+               }
+       }
+
+       @Override
+       public void open() throws CatalogException {
+               if (client == null) {
+                       client = getMetastoreClient(hiveConf);
+                       LOG.info("Connected to Hive metastore");
+               }
+       }
+
+       @Override
+       public void close() throws CatalogException {
+               if (client != null) {
+                       client.close();
+                       client = null;
+                       LOG.info("Close connection to Hive metastore");
+               }
+       }
+
+       // ------ databases ------
+
+       @Override
+       public String getCurrentDatabase() throws CatalogException {
+               return currentDatabase;
+       }
+
+       @Override
+       public void setCurrentDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+               if (!databaseExists(databaseName)) {
+                       throw new DatabaseNotExistException(catalogName, 
databaseName);
+               }
+
+               currentDatabase = databaseName;
+       }
+
+       @Override
+       public List<String> listDatabases() throws CatalogException {
+               try {
+                       return client.getAllDatabases();
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to list all databases in 
%s", catalogName), e);
+               }
+       }
+
+       @Override
+       public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               Database hiveDb;
+
+               try {
+                       hiveDb = client.getDatabase(databaseName);
+               } catch (NoSuchObjectException e) {
+                       throw new DatabaseNotExistException(catalogName, 
databaseName);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to get database %s from 
%s", databaseName, catalogName), e);
+               }
+
+               return new GenericCatalogDatabase(hiveDb.getParameters(), 
hiveDb.getDescription());
+       }
+
+       @Override
+       public boolean databaseExists(String databaseName) throws 
CatalogException {
+               try {
+                       return client.getDatabase(databaseName) != null;
+               } catch (NoSuchObjectException e) {
+                       return false;
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to determine whether 
database %s exists or not", databaseName), e);
+               }
+       }
+
+       @Override
+       public void createDatabase(String name, CatalogDatabase database, 
boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
+
+               try {
+                       
client.createDatabase(GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, 
database));
+               } catch (AlreadyExistsException e) {
+                       if (!ignoreIfExists) {
+                               throw new 
DatabaseAlreadyExistException(catalogName, name);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(String.format("Failed to 
create database %s", name), e);
+               }
+       }
+
+       @Override
+       public void dropDatabase(String name, boolean ignoreIfNotExists) throws 
DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+               try {
+                       client.dropDatabase(name, true, ignoreIfNotExists);
+               } catch (NoSuchObjectException e) {
+                       if (!ignoreIfNotExists) {
+                               throw new 
DatabaseNotExistException(catalogName, name);
+                       }
+               } catch (InvalidOperationException e) {
+                       if (e.getMessage().startsWith(String.format("Database 
%s is not empty", name))) {
+                               throw new 
DatabaseNotEmptyException(catalogName, name);
+                       } else {
+                               throw new 
CatalogException(String.format("Failed to drop database %s", name), e);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(String.format("Failed to 
drop database %s", name), e);
+               }
+       }
+
+       @Override
+       public void alterDatabase(String name, CatalogDatabase newDatabase, 
boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
+               try {
+                       if (databaseExists(name)) {
+                               client.alterDatabase(name, 
GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, newDatabase));
+                       } else if (!ignoreIfNotExists) {
+                               throw new 
DatabaseNotExistException(catalogName, name);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
+               }
+       }
+
+       // ------ tables and views------
+
+       @Override
+       public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+                       throws TableNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+                       throws TableNotExistException, 
TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+                       throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+                       throws TableNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public List<String> listTables(String databaseName)
+                       throws DatabaseNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public List<String> listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CatalogBaseTable getTable(ObjectPath objectPath) throws 
TableNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean tableExists(ObjectPath objectPath) throws 
CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       // ------ partitions ------
+
+       @Override
+       public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+                       throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, 
PartitionAlreadyExistsException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, boolean ignoreIfNotExists)
+                       throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, 
PartitionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
+                       throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, 
PartitionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+                       throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+                       throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CatalogPartition getPartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+                       throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, 
PartitionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean partitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+                       throws CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       // ------ functions ------
+
+       @Override
+       public void createFunction(ObjectPath functionPath, CatalogFunction 
function, boolean ignoreIfExists)
+                       throws FunctionAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void alterFunction(ObjectPath functionPath, CatalogFunction 
newFunction, boolean ignoreIfNotExists)
+                       throws FunctionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void dropFunction(ObjectPath functionPath, boolean 
ignoreIfNotExists)
+                       throws FunctionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public List<String> listFunctions(String dbName) throws 
DatabaseNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CatalogFunction getFunction(ObjectPath functionPath) throws 
FunctionNotExistException, CatalogException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public boolean functionExists(ObjectPath functionPath) throws 
CatalogException {
+               throw new UnsupportedOperationException();
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
new file mode 100644
index 0000000..779905a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+
+import java.util.Map;
+
+
+/**
+ * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
+ */
+public class GenericHiveMetastoreCatalogUtil {
+
+       private GenericHiveMetastoreCatalogUtil() {
+       }
+
+       // ------ Utils ------
+
+       /**
+        * Creates a Hive database from CatalogDatabase.
+        */
+       public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
+               Map<String, String> props = db.getProperties();
+               return new Database(
+                       dbName,
+                       db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+                       null,
+                       props);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE 
b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..ea1c160
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,26 @@
+flink-connector-hive
+Copyright 2014-2019 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- commons-dbcp:commons-dbcp:1.4
+- commons-pool:commons-pool:1.5.4
+- com.jolbox:bonecp:0.8.0.RELEASE
+- org.apache.hive:hive-common:2.3.4
+- org.apache.hive:hive-metastore:2.3.4
+- org.apache.hive:hive-serde:2.3.4
+- org.apache.hive:hive-service-rpc:2.3.4
+- org.apache.hive:hive-storage-api:2.4.0
+- org.apache.thrift:libthrift:0.9.3
+- org.datanucleus:datanucleus-api-jdo:4.2.4
+- org.datanucleus:datanucleus-core:4.1.17
+- org.datanucleus:datanucleus-rdbms:4.1.19
+- org.datanucleus:javax.jdo:3.2.0-m3
+
+This project bundles the following dependencies under the BSD license.
+See bundled license files for details.
+
+- org.antlr:antlr-runtime:3.5.2
diff --git 
a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/licenses/LICENSE.antlr
 
b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/licenses/LICENSE.antlr
new file mode 100644
index 0000000..0af2cce
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/licenses/LICENSE.antlr
@@ -0,0 +1,38 @@
+(BSD License: http://www.opensource.org/licenses/bsd-license)
+
+Copyright (c) 2012 Terence Parr and Sam Harwell
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the
+following conditions are met:
+
+* Redistributions of source code must retain the above
+  copyright notice, this list of conditions and the
+  following disclaimer.
+
+* Redistributions in binary form must reproduce the above
+  copyright notice, this list of conditions and the
+  following disclaimer in the documentation and/or other
+  materials provided with the distribution.
+
+* Neither the name of the Webbit nor the names of
+  its contributors may be used to endorse or promote products
+  derived from this software without specific prior written
+  permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
+GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
+
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
new file mode 100644
index 0000000..642c1c2
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTestBase;
+import org.apache.flink.table.catalog.GenericCatalogDatabase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+/**
+ * Test for GenericHiveMetastoreCatalog.
+ */
+public class GenericHiveMetastoreCatalogTest extends CatalogTestBase {
+
+       @BeforeClass
+       public static void init() throws IOException {
+               catalog = HiveTestUtils.createGenericHiveMetastoreCatalog();
+               catalog.open();
+       }
+
+       // =====================
+       // GenericHiveMetastoreCatalog doesn't support table operation yet
+       // Thus, overriding the following tests which involve table operation 
in CatalogTestBase so they won't run against GenericHiveMetastoreCatalog
+       // =====================
+
+       // TODO: re-enable this test once GenericHiveMetastoreCatalog support 
table operations
+       @Test
+       public void testDropDb_DatabaseNotEmptyException() throws Exception {
+       }
+
+       // ------ utils ------
+
+       @Override
+       public String getBuiltInDefaultDatabase() {
+               return GenericHiveMetastoreCatalog.DEFAULT_DB;
+       }
+
+       @Override
+       public CatalogDatabase createDb() {
+               return new GenericCatalogDatabase(
+                       new HashMap<String, String>() {{
+                               put("k1", "v1");
+                       }},
+                       TEST_COMMENT);
+       }
+
+       @Override
+       public CatalogDatabase createAnotherDb() {
+               return new GenericCatalogDatabase(
+                       new HashMap<String, String>() {{
+                               put("k2", "v2");
+                       }},
+                       TEST_COMMENT);
+       }
+
+       @Override
+       public CatalogTable createTable() {
+               // TODO: implement this once GenericHiveMetastoreCatalog 
support table operations
+               return null;
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
new file mode 100644
index 0000000..83f5bed
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+/**
+ * Test utils for Hive connector.
+ */
+public class HiveTestUtils {
+       private static final String HIVE_SITE_XML = "hive-site.xml";
+       private static final String HIVE_WAREHOUSE_URI_FORMAT = 
"jdbc:derby:;databaseName=%s;create=true";
+       private static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       /**
+        * Create a GenericHiveMetastoreCatalog with an embedded Hive Metastore.
+        */
+       public static GenericHiveMetastoreCatalog 
createGenericHiveMetastoreCatalog() throws IOException {
+               return new GenericHiveMetastoreCatalog("test", getHiveConf());
+       }
+
+       private static HiveConf getHiveConf() throws IOException {
+               ClassLoader classLoader = new 
HiveTestUtils().getClass().getClassLoader();
+               
HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML));
+
+               TEMPORARY_FOLDER.create();
+               String warehouseDir = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db";
+               String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, 
warehouseDir);
+               HiveConf hiveConf = new HiveConf();
+               hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, 
TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());
+               hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, 
warehouseUri);
+
+               return hiveConf;
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml 
b/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml
new file mode 100644
index 0000000..c83bab8
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<configuration>
+
+       <property>
+               <name>hive.metastore.schema.verification</name>
+               <value>false</value>
+       </property>
+
+       <property>
+               <name>hive.metastore.client.capability.check</name>
+               <value>false</value>
+       </property>
+
+       <property>
+               <name>datanucleus.schema.autoCreateTables</name>
+               <value>true</value>
+       </property>
+
+       <property>
+               <name>datanucleus.schema.autoCreateAll</name>
+               <value>true</value>
+       </property>
+
+</configuration>
diff --git 
a/flink-connectors/flink-connector-hive/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-hive/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..fcd8654
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/resources/log4j-test.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 92058fa..e6d601d 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -49,6 +49,7 @@ under the License.
                <module>flink-connector-elasticsearch2</module>
                <module>flink-connector-elasticsearch5</module>
                <module>flink-connector-elasticsearch6</module>
+               <module>flink-connector-hive</module>
                <module>flink-connector-rabbitmq</module>
                <module>flink-connector-twitter</module>
                <module>flink-connector-nifi</module>
diff --git a/flink-table/flink-table-api-java/pom.xml 
b/flink-table/flink-table-api-java/pom.xml
index a6bf656..f8a8fd7 100644
--- a/flink-table/flink-table-api-java/pom.xml
+++ b/flink-table/flink-table-api-java/pom.xml
@@ -42,5 +42,15 @@ under the License.
                        <artifactId>flink-table-common</artifactId>
                        <version>${project.version}</version>
                </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-common</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 </project>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
index 5f2c732..959247a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
@@ -38,7 +38,7 @@ public class GenericCatalogDatabase implements 
CatalogDatabase {
 
        public GenericCatalogDatabase(Map<String, String> properties, String 
comment) {
                this(properties);
-               this.comment = comment;
+               this.comment = checkNotNull(comment, "comment cannot be null");
        }
 
        public Map<String, String> getProperties() {
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index d96d8b5..50d203a 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -21,8 +21,6 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
@@ -35,10 +33,8 @@ import 
org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.functions.ScalarFunction;
 
 import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
+import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -55,37 +51,14 @@ import static org.junit.Assert.assertTrue;
 /**
  * Test for GenericInMemoryCatalog.
  */
-public class GenericInMemoryCatalogTest {
-       private static final String IS_STREAMING = "is_streaming";
-
-       private final String testCatalogName = "test-catalog";
-       private final String db1 = "db1";
-       private final String db2 = "db2";
-       private final String nonExistantDatabase = "non-existant-db";
-
-       private final String t1 = "t1";
-       private final String t2 = "t2";
-       private final ObjectPath path1 = new ObjectPath(db1, t1);
-       private final ObjectPath path2 = new ObjectPath(db2, t2);
-       private final ObjectPath path3 = new ObjectPath(db1, t2);
-       private final ObjectPath path4 = new ObjectPath(db1, "t3");
-       private final ObjectPath nonExistDbPath = 
ObjectPath.fromString("non.exist");
-       private final ObjectPath nonExistObjectPath = 
ObjectPath.fromString("db1.nonexist");
-
-       private static final String TEST_COMMENT = "test comment";
-       private static final String TABLE_COMMENT = "This is my batch table";
-
-       private static ReadableWritableCatalog catalog;
-
-       @Before
-       public void setUp() {
-               catalog = new GenericInMemoryCatalog(testCatalogName);
+public class GenericInMemoryCatalogTest extends CatalogTestBase {
+
+       @BeforeClass
+       public static void init() {
+               catalog = new GenericInMemoryCatalog(TEST_CATALOG_NAME);
                catalog.open();
        }
 
-       @Rule
-       public ExpectedException exception = ExpectedException.none();
-
        @After
        public void close() throws Exception {
                if (catalog.tableExists(path1)) {
@@ -100,13 +73,6 @@ public class GenericInMemoryCatalogTest {
                if (catalog.functionExists(path1)) {
                        catalog.dropFunction(path1, true);
                }
-               if (catalog.databaseExists(db1)) {
-                       catalog.dropDatabase(db1, true);
-               }
-               if (catalog.databaseExists(db2)) {
-                       catalog.dropDatabase(db2, true);
-               }
-               catalog.close();
        }
 
        // ------ tables ------
@@ -486,136 +452,6 @@ public class GenericInMemoryCatalogTest {
                assertEquals(Arrays.asList(path1.getObjectName()), 
catalog.listViews(db1));
        }
 
-       // ------ databases ------
-
-       @Test
-       public void testCreateDb() throws Exception {
-               catalog.createDatabase(db2, createDb(), false);
-
-               assertEquals(2, catalog.listDatabases().size());
-       }
-
-       @Test
-       public void testSetCurrentDatabase() throws Exception {
-               assertEquals(GenericInMemoryCatalog.DEFAULT_DB, 
catalog.getCurrentDatabase());
-               catalog.createDatabase(db2, createDb(), true);
-               catalog.setCurrentDatabase(db2);
-               assertEquals(db2, catalog.getCurrentDatabase());
-               catalog.setCurrentDatabase(GenericInMemoryCatalog.DEFAULT_DB);
-               assertEquals(GenericInMemoryCatalog.DEFAULT_DB, 
catalog.getCurrentDatabase());
-               catalog.dropDatabase(db2, false);
-       }
-
-       @Test
-       public void testSetCurrentDatabaseNegative() throws Exception {
-               exception.expect(DatabaseNotExistException.class);
-               exception.expectMessage("Database " + this.nonExistantDatabase 
+ " does not exist in Catalog");
-               catalog.setCurrentDatabase(this.nonExistantDatabase);
-       }
-
-       @Test
-       public void testCreateDb_DatabaseAlreadyExistException() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-
-               exception.expect(DatabaseAlreadyExistException.class);
-               exception.expectMessage("Database db1 already exists in 
Catalog");
-               catalog.createDatabase(db1, createDb(), false);
-       }
-
-       @Test
-       public void testCreateDb_DatabaseAlreadyExist_ignored() throws 
Exception {
-               CatalogDatabase cd1 = createDb();
-               catalog.createDatabase(db1, cd1, false);
-               List<String> dbs = catalog.listDatabases();
-
-               
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(cd1.getProperties().entrySet()));
-               assertEquals(2, dbs.size());
-               assertEquals(new HashSet<>(Arrays.asList(db1, 
catalog.getCurrentDatabase())), new HashSet<>(dbs));
-
-               catalog.createDatabase(db1, createAnotherDb(), true);
-
-               
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(cd1.getProperties().entrySet()));
-               assertEquals(2, dbs.size());
-               assertEquals(new HashSet<>(Arrays.asList(db1, 
catalog.getCurrentDatabase())), new HashSet<>(dbs));
-       }
-
-       @Test
-       public void testGetDb_DatabaseNotExistException() throws Exception {
-               exception.expect(DatabaseNotExistException.class);
-               exception.expectMessage("Database nonexistent does not exist in 
Catalog");
-               catalog.getDatabase("nonexistent");
-       }
-
-       @Test
-       public void testDropDb() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-
-               assertTrue(catalog.listDatabases().contains(db1));
-
-               catalog.dropDatabase(db1, false);
-
-               assertFalse(catalog.listDatabases().contains(db1));
-       }
-
-       @Test
-       public void testDropDb_DatabaseNotExistException() throws Exception {
-               exception.expect(DatabaseNotExistException.class);
-               exception.expectMessage("Database db1 does not exist in 
Catalog");
-               catalog.dropDatabase(db1, false);
-       }
-
-       @Test
-       public void testDropDb_DatabaseNotExist_Ignore() throws Exception {
-               catalog.dropDatabase(db1, true);
-       }
-
-       @Test
-       public void testDropDb_databaseIsNotEmpty() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createTable(), false);
-
-               exception.expect(DatabaseNotEmptyException.class);
-               exception.expectMessage("Database db1 in Catalog test-catalog 
is not empty");
-               catalog.dropDatabase(db1, true);
-       }
-
-       @Test
-       public void testAlterDb() throws Exception {
-               CatalogDatabase db = createDb();
-               catalog.createDatabase(db1, db, false);
-
-               
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(db.getProperties().entrySet()));
-
-               CatalogDatabase newDb = createAnotherDb();
-               catalog.alterDatabase(db1, newDb, false);
-
-               
assertFalse(catalog.getDatabase(db1).getProperties().entrySet().containsAll(db.getProperties().entrySet()));
-               
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(newDb.getProperties().entrySet()));
-       }
-
-       @Test
-       public void testAlterDb_DatabaseNotExistException() throws Exception {
-               exception.expect(DatabaseNotExistException.class);
-               exception.expectMessage("Database nonexistent does not exist in 
Catalog");
-               catalog.alterDatabase("nonexistent", createDb(), false);
-       }
-
-       @Test
-       public void testAlterDb_DatabaseNotExist_ignored() throws Exception {
-               catalog.alterDatabase("nonexistent", createDb(), true);
-
-               assertFalse(catalog.databaseExists("nonexistent"));
-       }
-
-       @Test
-       public void testDbExists() throws Exception {
-               assertFalse(catalog.databaseExists("nonexistent"));
-
-               catalog.createDatabase(db1, createDb(), false);
-
-               assertTrue(catalog.databaseExists(db1));
-       }
-
        @Test
        public void testRenameView() throws Exception {
                catalog.createDatabase("db1", new GenericCatalogDatabase(new 
HashMap<>()), false);
@@ -660,7 +496,7 @@ public class GenericInMemoryCatalogTest {
                exception.expect(PartitionSpecInvalidException.class);
                exception.expectMessage(
                        String.format("PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s",
-                               invalid, table.getPartitionKeys(), 
path1.getFullName(), testCatalogName));
+                               invalid, table.getPartitionKeys(), 
path1.getFullName(), TEST_CATALOG_NAME));
                catalog.listPartitions(path1, invalid);
        }
 
@@ -670,7 +506,7 @@ public class GenericInMemoryCatalogTest {
 
                exception.expect(TableNotExistException.class);
                exception.expectMessage(
-                       String.format("Table (or view) %s does not exist in 
Catalog %s.", path1.getFullName(), testCatalogName));
+                       String.format("Table (or view) %s does not exist in 
Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
                catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
        }
 
@@ -681,7 +517,7 @@ public class GenericInMemoryCatalogTest {
 
                exception.expect(TableNotPartitionedException.class);
                exception.expectMessage(
-                       String.format("Table %s in catalog %s is not 
partitioned.", path1.getFullName(), testCatalogName));
+                       String.format("Table %s in catalog %s is not 
partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
                catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
        }
 
@@ -695,7 +531,7 @@ public class GenericInMemoryCatalogTest {
                exception.expect(PartitionSpecInvalidException.class);
                exception.expectMessage(
                        String.format("PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s.",
-                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), testCatalogName));
+                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), TEST_CATALOG_NAME));
                catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
        }
 
@@ -711,7 +547,7 @@ public class GenericInMemoryCatalogTest {
                exception.expect(PartitionAlreadyExistsException.class);
                exception.expectMessage(
                        String.format("Partition %s of table %s in catalog %s 
already exists.",
-                               partitionSpec, path1.getFullName(), 
testCatalogName));
+                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
                catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
        }
 
@@ -744,7 +580,7 @@ public class GenericInMemoryCatalogTest {
 
                exception.expect(TableNotExistException.class);
                exception.expectMessage(
-                       String.format("Table (or view) %s does not exist in 
Catalog %s.", path1.getFullName(), testCatalogName));
+                       String.format("Table (or view) %s does not exist in 
Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
                catalog.dropPartition(path1, createPartitionSpec(), false);
        }
 
@@ -755,7 +591,7 @@ public class GenericInMemoryCatalogTest {
 
                exception.expect(TableNotPartitionedException.class);
                exception.expectMessage(
-                       String.format("Table %s in catalog %s is not 
partitioned.", path1.getFullName(), testCatalogName));
+                       String.format("Table %s in catalog %s is not 
partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
                catalog.dropPartition(path1, createPartitionSpec(), false);
        }
 
@@ -769,7 +605,7 @@ public class GenericInMemoryCatalogTest {
                exception.expect(PartitionSpecInvalidException.class);
                exception.expectMessage(
                        String.format("PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s.",
-                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), testCatalogName));
+                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), TEST_CATALOG_NAME));
                catalog.dropPartition(path1, partitionSpec, false);
        }
 
@@ -781,7 +617,7 @@ public class GenericInMemoryCatalogTest {
                CatalogPartitionSpec partitionSpec = createPartitionSpec();
                exception.expect(PartitionNotExistException.class);
                exception.expectMessage(
-                       String.format("Partition %s of table %s in catalog %s 
does not exist.", partitionSpec, path1.getFullName(), testCatalogName));
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.", partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
                catalog.dropPartition(path1, partitionSpec, false);
        }
 
@@ -828,7 +664,7 @@ public class GenericInMemoryCatalogTest {
                CatalogPartitionSpec partitionSpec = createPartitionSpec();
                exception.expect(TableNotExistException.class);
                exception.expectMessage(
-                       String.format("Table (or view) %s does not exist in 
Catalog %s.", path1.getFullName(), testCatalogName));
+                       String.format("Table (or view) %s does not exist in 
Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
                catalog.alterPartition(path1, partitionSpec, createPartition(), 
false);
        }
 
@@ -840,7 +676,7 @@ public class GenericInMemoryCatalogTest {
                CatalogPartitionSpec partitionSpec = createPartitionSpec();
                exception.expect(TableNotPartitionedException.class);
                exception.expectMessage(
-                       String.format("Table %s in catalog %s is not 
partitioned.", path1.getFullName(), testCatalogName));
+                       String.format("Table %s in catalog %s is not 
partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
                catalog.alterPartition(path1, partitionSpec, createPartition(), 
false);
        }
 
@@ -854,7 +690,7 @@ public class GenericInMemoryCatalogTest {
                exception.expect(PartitionSpecInvalidException.class);
                exception.expectMessage(
                        String.format("PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s.",
-                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), testCatalogName));
+                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), TEST_CATALOG_NAME));
                catalog.alterPartition(path1, partitionSpec, createPartition(), 
false);
        }
 
@@ -868,7 +704,7 @@ public class GenericInMemoryCatalogTest {
                exception.expect(PartitionNotExistException.class);
                exception.expectMessage(
                        String.format("Partition %s of table %s in catalog %s 
does not exist.",
-                               partitionSpec, path1.getFullName(), 
testCatalogName));
+                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
                catalog.alterPartition(path1, partitionSpec, catalogPartition, 
false);
        }
 
@@ -892,7 +728,7 @@ public class GenericInMemoryCatalogTest {
 
                exception.expect(TableNotPartitionedException.class);
                exception.expectMessage(
-                       String.format("Table %s in catalog %s is not 
partitioned.", path1.getFullName(), testCatalogName));
+                       String.format("Table %s in catalog %s is not 
partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
                catalog.getPartition(path1, createPartitionSpec());
        }
 
@@ -906,7 +742,7 @@ public class GenericInMemoryCatalogTest {
                exception.expect(PartitionSpecInvalidException.class);
                exception.expectMessage(
                        String.format("PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s.",
-                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), testCatalogName));
+                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), TEST_CATALOG_NAME));
                catalog.getPartition(path1, partitionSpec);
        }
 
@@ -924,7 +760,7 @@ public class GenericInMemoryCatalogTest {
                exception.expect(PartitionSpecInvalidException.class);
                exception.expectMessage(
                        String.format("PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s.",
-                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), testCatalogName));
+                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), TEST_CATALOG_NAME));
                catalog.getPartition(path1, partitionSpec);
        }
 
@@ -937,7 +773,7 @@ public class GenericInMemoryCatalogTest {
                exception.expect(PartitionNotExistException.class);
                exception.expectMessage(
                        String.format("Partition %s of table %s in catalog %s 
does not exist.",
-                               partitionSpec, path1.getFullName(), 
testCatalogName));
+                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
                catalog.getPartition(path1, partitionSpec);
        }
 
@@ -1106,13 +942,33 @@ public class GenericInMemoryCatalogTest {
 
        // ------ utilities ------
 
+       @Override
+       public String getBuiltInDefaultDatabase() {
+               return GenericInMemoryCatalog.DEFAULT_DB;
+       }
+
+       @Override
+       public CatalogDatabase createDb() {
+               return new GenericCatalogDatabase(new HashMap<String, String>() 
{{
+                       put("k1", "v1");
+               }}, TEST_COMMENT);
+       }
+
+       @Override
+       public CatalogDatabase createAnotherDb() {
+               return new GenericCatalogDatabase(new HashMap<String, String>() 
{{
+                       put("k2", "v2");
+               }}, "this is another database.");
+       }
+
        private GenericCatalogTable createStreamingTable() {
                return CatalogTestUtil.createTable(
                        createTableSchema(),
                        getStreamingTableProperties(), TABLE_COMMENT);
        }
 
-       private GenericCatalogTable createTable() {
+       @Override
+       public GenericCatalogTable createTable() {
                return CatalogTestUtil.createTable(
                        createTableSchema(),
                        getBatchTableProperties(), TABLE_COMMENT);
@@ -1186,12 +1042,6 @@ public class GenericInMemoryCatalogTest {
                return new GenericCatalogPartition(props);
        }
 
-       private CatalogDatabase createDb() {
-               return new GenericCatalogDatabase(new HashMap<String, String>() 
{{
-                       put("k1", "v1");
-               }}, TEST_COMMENT);
-       }
-
        private Map<String, String> getBatchTableProperties() {
                return new HashMap<String, String>() {{
                        put(IS_STREAMING, "false");
@@ -1204,12 +1054,6 @@ public class GenericInMemoryCatalogTest {
                }};
        }
 
-       private CatalogDatabase createAnotherDb() {
-               return new GenericCatalogDatabase(new HashMap<String, String>() 
{{
-                       put("k2", "v2");
-               }}, "this is another database.");
-       }
-
        private TableSchema createTableSchema() {
                return new TableSchema(
                        new String[] {"first", "second", "third"},
@@ -1235,7 +1079,7 @@ public class GenericInMemoryCatalogTest {
        private CatalogView createView() {
                return new GenericCatalogView(
                        String.format("select * from %s", t1),
-                       String.format("select * from %s.%s", testCatalogName, 
path1.getFullName()),
+                       String.format("select * from %s.%s", TEST_CATALOG_NAME, 
path1.getFullName()),
                        createTableSchema(),
                        new HashMap<>(),
                        "This is a view");
@@ -1244,7 +1088,7 @@ public class GenericInMemoryCatalogTest {
        private CatalogView createAnotherView() {
                return new GenericCatalogView(
                        String.format("select * from %s", t2),
-                       String.format("select * from %s.%s", testCatalogName, 
path2.getFullName()),
+                       String.format("select * from %s.%s", TEST_CATALOG_NAME, 
path2.getFullName()),
                        createTableSchema(),
                        new HashMap<>(),
                        "This is another view");
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
new file mode 100644
index 0000000..83a8f2b
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base testing class for unit tests of a specific catalog, like 
GenericInMemoryCatalog and HiveCatalog.
+ */
+public abstract class CatalogTestBase {
+       protected static final String IS_STREAMING = "is_streaming";
+
+       protected final String db1 = "db1";
+       protected final String db2 = "db2";
+       protected final String nonExistentDatabase = "non-existent-db";
+
+       protected final String t1 = "t1";
+       protected final String t2 = "t2";
+       protected final ObjectPath path1 = new ObjectPath(db1, t1);
+       protected final ObjectPath path2 = new ObjectPath(db2, t2);
+       protected final ObjectPath path3 = new ObjectPath(db1, t2);
+       protected final ObjectPath path4 = new ObjectPath(db1, "t3");
+       protected final ObjectPath nonExistDbPath = 
ObjectPath.fromString("non.exist");
+       protected final ObjectPath nonExistObjectPath = 
ObjectPath.fromString("db1.nonexist");
+
+       protected static final String TEST_CATALOG_NAME = "test-catalog";
+       protected static final String TEST_COMMENT = "test comment";
+       protected static final String TABLE_COMMENT = "This is my batch table";
+
+       protected static ReadableWritableCatalog catalog;
+
+       @Rule
+       public ExpectedException exception = ExpectedException.none();
+
+       @After
+       public void cleanup() throws Exception {
+               if (catalog.databaseExists(db1)) {
+                       catalog.dropDatabase(db1, true);
+               }
+               if (catalog.databaseExists(db2)) {
+                       catalog.dropDatabase(db2, true);
+               }
+       }
+
+       @AfterClass
+       public static void closeup() {
+               catalog.close();
+       }
+
+       // ------ databases ------
+
+       @Test
+       public void testCreateDb() throws Exception {
+               catalog.createDatabase(db2, createDb(), false);
+
+               assertEquals(2, catalog.listDatabases().size());
+       }
+
+       @Test
+       public void testSetCurrentDatabase() throws Exception {
+               assertEquals(getBuiltInDefaultDatabase(), 
catalog.getCurrentDatabase());
+               catalog.createDatabase(db2, createDb(), true);
+               catalog.setCurrentDatabase(db2);
+               assertEquals(db2, catalog.getCurrentDatabase());
+               catalog.setCurrentDatabase(getBuiltInDefaultDatabase());
+               assertEquals(getBuiltInDefaultDatabase(), 
catalog.getCurrentDatabase());
+               catalog.dropDatabase(db2, false);
+       }
+
+       @Test
+       public void testSetCurrentDatabaseNegative() throws Exception {
+               exception.expect(DatabaseNotExistException.class);
+               exception.expectMessage("Database " + this.nonExistentDatabase 
+ " does not exist in Catalog");
+               catalog.setCurrentDatabase(this.nonExistentDatabase);
+       }
+
+       @Test
+       public void testCreateDb_DatabaseAlreadyExistException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               exception.expect(DatabaseAlreadyExistException.class);
+               exception.expectMessage("Database db1 already exists in 
Catalog");
+               catalog.createDatabase(db1, createDb(), false);
+       }
+
+       @Test
+       public void testCreateDb_DatabaseAlreadyExist_ignored() throws 
Exception {
+               CatalogDatabase cd1 = createDb();
+               catalog.createDatabase(db1, cd1, false);
+               List<String> dbs = catalog.listDatabases();
+
+               
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(cd1.getProperties().entrySet()));
+               assertEquals(2, dbs.size());
+               assertEquals(new HashSet<>(Arrays.asList(db1, 
catalog.getCurrentDatabase())), new HashSet<>(dbs));
+
+               catalog.createDatabase(db1, createAnotherDb(), true);
+
+               
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(cd1.getProperties().entrySet()));
+               assertEquals(2, dbs.size());
+               assertEquals(new HashSet<>(Arrays.asList(db1, 
catalog.getCurrentDatabase())), new HashSet<>(dbs));
+       }
+
+       @Test
+       public void testGetDb_DatabaseNotExistException() throws Exception {
+               exception.expect(DatabaseNotExistException.class);
+               exception.expectMessage("Database nonexistent does not exist in 
Catalog");
+               catalog.getDatabase("nonexistent");
+       }
+
+       @Test
+       public void testDropDb() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               assertTrue(catalog.listDatabases().contains(db1));
+
+               catalog.dropDatabase(db1, false);
+
+               assertFalse(catalog.listDatabases().contains(db1));
+       }
+
+       @Test
+       public void testDropDb_DatabaseNotExistException() throws Exception {
+               exception.expect(DatabaseNotExistException.class);
+               exception.expectMessage("Database db1 does not exist in 
Catalog");
+               catalog.dropDatabase(db1, false);
+       }
+
+       @Test
+       public void testDropDb_DatabaseNotExist_Ignore() throws Exception {
+               catalog.dropDatabase(db1, true);
+       }
+
+       @Test
+       public void testDropDb_DatabaseNotEmptyException() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createTable(), false);
+
+               exception.expect(DatabaseNotEmptyException.class);
+               exception.expectMessage("Database db1 in Catalog test-catalog 
is not empty");
+               catalog.dropDatabase(db1, true);
+       }
+
+       @Test
+       public void testAlterDb() throws Exception {
+               CatalogDatabase db = createDb();
+               catalog.createDatabase(db1, db, false);
+
+               
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(db.getProperties().entrySet()));
+
+               CatalogDatabase newDb = createAnotherDb();
+               catalog.alterDatabase(db1, newDb, false);
+
+               
assertFalse(catalog.getDatabase(db1).getProperties().entrySet().containsAll(db.getProperties().entrySet()));
+               
assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(newDb.getProperties().entrySet()));
+       }
+
+       @Test
+       public void testAlterDb_DatabaseNotExistException() throws Exception {
+               exception.expect(DatabaseNotExistException.class);
+               exception.expectMessage("Database nonexistent does not exist in 
Catalog");
+               catalog.alterDatabase("nonexistent", createDb(), false);
+       }
+
+       @Test
+       public void testAlterDb_DatabaseNotExist_ignored() throws Exception {
+               catalog.alterDatabase("nonexistent", createDb(), true);
+
+               assertFalse(catalog.databaseExists("nonexistent"));
+       }
+
+       @Test
+       public void testDbExists() throws Exception {
+               assertFalse(catalog.databaseExists("nonexistent"));
+
+               catalog.createDatabase(db1, createDb(), false);
+
+               assertTrue(catalog.databaseExists(db1));
+       }
+
+       // ------ utilities ------
+
+       /**
+        * Get the built-in default database of the specific catalog 
implementation.
+        *
+        * @return The built-in default database name
+        */
+       public abstract String getBuiltInDefaultDatabase();
+
+       /**
+        * Create a CatalogDatabase instance by specific catalog implementation.
+        *
+        * @return a CatalogDatabase instance
+        */
+       public abstract CatalogDatabase createDb();
+
+       /**
+        * Create another CatalogDatabase instance by specific catalog 
implementation.
+        *
+        * @return another CatalogDatabase instance
+        */
+       public abstract CatalogDatabase createAnotherDb();
+
+       /**
+        * Create a CatalogTable instance by specific catalog implementation.
+        *
+        * @return a CatalogTable instance
+        */
+       public abstract CatalogTable createTable();
+}

Reply via email to