This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 39d8236 [FLINK-12238][hive] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module 39d8236 is described below commit 39d82368eca3891d27c85dd9bc56db2344ee73ba Author: Bowen L <bowenl...@gmail.com> AuthorDate: Tue Apr 30 01:19:39 2019 -0700 [FLINK-12238][hive] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module This closes #8205 --- flink-connectors/flink-connector-hive/pom.xml | 429 +++++++++++++++++++++ .../catalog/hive/GenericHiveMetastoreCatalog.java | 352 +++++++++++++++++ .../hive/GenericHiveMetastoreCatalogUtil.java | 49 +++ .../src/main/resources/META-INF/NOTICE | 26 ++ .../main/resources/META-INF/licenses/LICENSE.antlr | 38 ++ .../hive/GenericHiveMetastoreCatalogTest.java | 83 ++++ .../flink/table/catalog/hive/HiveTestUtils.java | 54 +++ .../src/test/resources/hive-site.xml | 42 ++ .../src/test/resources/log4j-test.properties | 24 ++ flink-connectors/pom.xml | 1 + flink-table/flink-table-api-java/pom.xml | 10 + .../table/catalog/GenericCatalogDatabase.java | 2 +- .../table/catalog/GenericInMemoryCatalogTest.java | 248 +++--------- .../flink/table/catalog/CatalogTestBase.java | 241 ++++++++++++ 14 files changed, 1396 insertions(+), 203 deletions(-) diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml new file mode 100644 index 0000000..cb09934 --- /dev/null +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -0,0 +1,429 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.9-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> + <name>flink-connector-hive</name> + + <packaging>jar</packaging> + + <properties> + <hive.version>2.3.4</hive.version> + <hivemetastore.hadoop.version>2.7.2</hivemetastore.hadoop.version> + </properties> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Hadoop dependency --> + <!-- Hadoop as provided dependencies, so we can depend on them without pulling in Hadoop --> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hivemetastore.hadoop.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hivemetastore.hadoop.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Hive dependencies --> + <!-- Note: Hive published jars do not have proper dependencies declared. + We need to push for HIVE-16391 (https://issues.apache.org/jira/browse/HIVE-16391) to resolve this problem. --> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + <version>${hive.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + </exclusion> + <exclusion> + <groupId>javolution</groupId> + <artifactId>javolution</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + </exclusion> + <exclusion> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </exclusion> + <exclusion> + <groupId>com.zaxxer</groupId> + <artifactId>HikariCP</artifactId> + </exclusion> + <exclusion> + <groupId>javax.jdo</groupId> + <artifactId>jdo-api</artifactId> + </exclusion> + <exclusion> + <groupId>co.cask.tephra</groupId> + <artifactId>tephra-api</artifactId> + </exclusion> + <exclusion> + <groupId>co.cask.tephra</groupId> + <artifactId>tephra-core</artifactId> + </exclusion> + <exclusion> + <groupId>co.cask.tephra</groupId> + <artifactId>tephra-hbase-compat-1.0</artifactId> + </exclusion> + <exclusion> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libfb303</artifactId> + </exclusion> + <exclusion> + <groupId>javax.transaction</groupId> + <artifactId>transaction-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + </exclusion> + <exclusion> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-1.2-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.ant</groupId> + <artifactId>ant</artifactId> + </exclusion> + <exclusion> + <groupId>com.tdunning</groupId> + <artifactId>json</artifactId> + </exclusion> + <exclusion> + <groupId>jline</groupId> + <artifactId>jline</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty.aggregate</groupId> + <artifactId>jetty-all</artifactId> + </exclusion> + <exclusion> + <groupId>org.eclipse.jetty.orbit</groupId> + <artifactId>javax.servlet</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-web</artifactId> + </exclusion> + <exclusion> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + </exclusion> + <exclusion> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-jvm</artifactId> + </exclusion> + <exclusion> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-json</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + <exclusion> + <groupId>com.github.joshelser</groupId> + <artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId> + </exclusion> + + <!-- org.apache.hive:hive-service-rpc --> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + </exclusion> + + <!-- org.apache.hive:hive-serde --> + <exclusion> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.opencsv</groupId> + <artifactId>opencsv</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop-bundle</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${hive.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-vector-code-gen</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-llap-tez</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + </exclusion> + <exclusion> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </exclusion> + <exclusion> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-1.2-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> + <exclusion> + <groupId>org.antlr</groupId> + <artifactId>antlr-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>org.antlr</groupId> + <artifactId>ST4</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.ant</groupId> + <artifactId>ant</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.ivy</groupId> + <artifactId>ivy</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.curator</groupId> + <artifactId>apache-curator</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.groovy</groupId> + <artifactId>groovy-all</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-druid</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.calcite.avatica</groupId> + <artifactId>avatica</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </exclusion> + <exclusion> + <groupId>stax</groupId> + <artifactId>stax-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>10.10.2.0</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadeTestJar>false</shadeTestJar> + <artifactSet> + <!-- Needs end-to-end tests to ensure the built flink-connector-hive jar contains all required dependencies and can run --> + <includes> + <include>commons-dbcp:commons-dbcp</include> + <include>commons-pool:commons-pool</include> + <include>commons-beanutils:commons-beanutils</include> + <include>com.jolbox:bonecp</include> + <include>org.apache.hive:*</include> + <include>org.apache.thrift:libthrift</include> + <include>org.datanucleus:*</include> + <include>org.antlr:antlr-runtime</include> + </includes> + </artifactSet> + <promoteTransitiveDependencies>true</promoteTransitiveDependencies> + <!-- DO NOT RELOCATE GUAVA IN THIS PACKAGE --> + <filters> + <filter> + <!-- some dependencies bring their own LICENSE.txt which we don't need --> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/LICENSE.txt</exclude> + <exclude>META-INF/ASM_LICENSE.txt</exclude> + <exclude>META-INF/ASL2.0</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Configure derby.log of embedded Hive metastore for unit tests --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <forkCount>1</forkCount> + <reuseForks>false</reuseForks> + <systemPropertyVariables> + <derby.stream.error.file>${project.build.directory}/derby.log</derby.stream.error.file> + </systemPropertyVariables> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java new file mode 100644 index 0000000..4c07938 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.GenericCatalogDatabase; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ReadableWritableCatalog; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.util.StringUtils; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A catalog that persists all Flink streaming and batch metadata by using Hive metastore as a persistent storage. + */ +public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog { + private static final Logger LOG = LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class); + + public static final String DEFAULT_DB = "default"; + + private final String catalogName; + private final HiveConf hiveConf; + + private String currentDatabase = DEFAULT_DB; + private IMetaStoreClient client; + + public GenericHiveMetastoreCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, getHiveConf(hivemetastoreURI)); + } + + public GenericHiveMetastoreCatalog(String catalogName, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + this.catalogName = catalogName; + + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); + LOG.info("Created GenericHiveMetastoreCatalog '{}'", catalogName); + } + + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); + } + } + + @Override + public void open() throws CatalogException { + if (client == null) { + client = getMetastoreClient(hiveConf); + LOG.info("Connected to Hive metastore"); + } + } + + @Override + public void close() throws CatalogException { + if (client != null) { + client.close(); + client = null; + LOG.info("Close connection to Hive metastore"); + } + } + + // ------ databases ------ + + @Override + public String getCurrentDatabase() throws CatalogException { + return currentDatabase; + } + + @Override + public void setCurrentDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(catalogName, databaseName); + } + + currentDatabase = databaseName; + } + + @Override + public List<String> listDatabases() throws CatalogException { + try { + return client.getAllDatabases(); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to list all databases in %s", catalogName), e); + } + } + + @Override + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { + Database hiveDb; + + try { + hiveDb = client.getDatabase(databaseName); + } catch (NoSuchObjectException e) { + throw new DatabaseNotExistException(catalogName, databaseName); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to get database %s from %s", databaseName, catalogName), e); + } + + return new GenericCatalogDatabase(hiveDb.getParameters(), hiveDb.getDescription()); + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + try { + return client.getDatabase(databaseName) != null; + } catch (NoSuchObjectException e) { + return false; + } catch (TException e) { + throw new CatalogException( + String.format("Failed to determine whether database %s exists or not", databaseName), e); + } + } + + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException { + + try { + client.createDatabase(GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, database)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(catalogName, name); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create database %s", name), e); + } + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + try { + client.dropDatabase(name, true, ignoreIfNotExists); + } catch (NoSuchObjectException e) { + if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(catalogName, name); + } + } catch (InvalidOperationException e) { + if (e.getMessage().startsWith(String.format("Database %s is not empty", name))) { + throw new DatabaseNotEmptyException(catalogName, name); + } else { + throw new CatalogException(String.format("Failed to drop database %s", name), e); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to drop database %s", name), e); + } + } + + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException { + try { + if (databaseExists(name)) { + client.alterDatabase(name, GenericHiveMetastoreCatalogUtil.createHiveDatabase(name, newDatabase)); + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(catalogName, name); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to alter database %s", name), e); + } + } + + // ------ tables and views------ + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<String> listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tableExists(ObjectPath objectPath) throws CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ partitions ------ + + @Override + public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ functions ------ + + @Override + public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) + throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean functionExists(ObjectPath functionPath) throws CatalogException { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java new file mode 100644 index 0000000..779905a --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogDatabase; + +import org.apache.hadoop.hive.metastore.api.Database; + +import java.util.Map; + + +/** + * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog. + */ +public class GenericHiveMetastoreCatalogUtil { + + private GenericHiveMetastoreCatalogUtil() { + } + + // ------ Utils ------ + + /** + * Creates a Hive database from CatalogDatabase. + */ + public static Database createHiveDatabase(String dbName, CatalogDatabase db) { + Map<String, String> props = db.getProperties(); + return new Database( + dbName, + db.getDescription().isPresent() ? db.getDescription().get() : null, + null, + props); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..ea1c160 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/NOTICE @@ -0,0 +1,26 @@ +flink-connector-hive +Copyright 2014-2019 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- commons-dbcp:commons-dbcp:1.4 +- commons-pool:commons-pool:1.5.4 +- com.jolbox:bonecp:0.8.0.RELEASE +- org.apache.hive:hive-common:2.3.4 +- org.apache.hive:hive-metastore:2.3.4 +- org.apache.hive:hive-serde:2.3.4 +- org.apache.hive:hive-service-rpc:2.3.4 +- org.apache.hive:hive-storage-api:2.4.0 +- org.apache.thrift:libthrift:0.9.3 +- org.datanucleus:datanucleus-api-jdo:4.2.4 +- org.datanucleus:datanucleus-core:4.1.17 +- org.datanucleus:datanucleus-rdbms:4.1.19 +- org.datanucleus:javax.jdo:3.2.0-m3 + +This project bundles the following dependencies under the BSD license. +See bundled license files for details. + +- org.antlr:antlr-runtime:3.5.2 diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/licenses/LICENSE.antlr b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/licenses/LICENSE.antlr new file mode 100644 index 0000000..0af2cce --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/licenses/LICENSE.antlr @@ -0,0 +1,38 @@ +(BSD License: http://www.opensource.org/licenses/bsd-license) + +Copyright (c) 2012 Terence Parr and Sam Harwell +All rights reserved. + +Redistribution and use in source and binary forms, with or +without modification, are permitted provided that the +following conditions are met: + +* Redistributions of source code must retain the above + copyright notice, this list of conditions and the + following disclaimer. + +* Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the + following disclaimer in the documentation and/or other + materials provided with the distribution. + +* Neither the name of the Webbit nor the names of + its contributors may be used to endorse or promote products + derived from this software without specific prior written + permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND +CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE +GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT +OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java new file mode 100644 index 0000000..642c1c2 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogTestBase; +import org.apache.flink.table.catalog.GenericCatalogDatabase; + +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; + +/** + * Test for GenericHiveMetastoreCatalog. + */ +public class GenericHiveMetastoreCatalogTest extends CatalogTestBase { + + @BeforeClass + public static void init() throws IOException { + catalog = HiveTestUtils.createGenericHiveMetastoreCatalog(); + catalog.open(); + } + + // ===================== + // GenericHiveMetastoreCatalog doesn't support table operation yet + // Thus, overriding the following tests which involve table operation in CatalogTestBase so they won't run against GenericHiveMetastoreCatalog + // ===================== + + // TODO: re-enable this test once GenericHiveMetastoreCatalog support table operations + @Test + public void testDropDb_DatabaseNotEmptyException() throws Exception { + } + + // ------ utils ------ + + @Override + public String getBuiltInDefaultDatabase() { + return GenericHiveMetastoreCatalog.DEFAULT_DB; + } + + @Override + public CatalogDatabase createDb() { + return new GenericCatalogDatabase( + new HashMap<String, String>() {{ + put("k1", "v1"); + }}, + TEST_COMMENT); + } + + @Override + public CatalogDatabase createAnotherDb() { + return new GenericCatalogDatabase( + new HashMap<String, String>() {{ + put("k2", "v2"); + }}, + TEST_COMMENT); + } + + @Override + public CatalogTable createTable() { + // TODO: implement this once GenericHiveMetastoreCatalog support table operations + return null; + } +} diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java new file mode 100644 index 0000000..83f5bed --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; + +/** + * Test utils for Hive connector. + */ +public class HiveTestUtils { + private static final String HIVE_SITE_XML = "hive-site.xml"; + private static final String HIVE_WAREHOUSE_URI_FORMAT = "jdbc:derby:;databaseName=%s;create=true"; + private static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + /** + * Create a GenericHiveMetastoreCatalog with an embedded Hive Metastore. + */ + public static GenericHiveMetastoreCatalog createGenericHiveMetastoreCatalog() throws IOException { + return new GenericHiveMetastoreCatalog("test", getHiveConf()); + } + + private static HiveConf getHiveConf() throws IOException { + ClassLoader classLoader = new HiveTestUtils().getClass().getClassLoader(); + HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML)); + + TEMPORARY_FOLDER.create(); + String warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db"; + String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir); + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath()); + hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri); + + return hiveConf; + } +} diff --git a/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml b/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml new file mode 100644 index 0000000..c83bab8 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/resources/hive-site.xml @@ -0,0 +1,42 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration> + + <property> + <name>hive.metastore.schema.verification</name> + <value>false</value> + </property> + + <property> + <name>hive.metastore.client.capability.check</name> + <value>false</value> + </property> + + <property> + <name>datanucleus.schema.autoCreateTables</name> + <value>true</value> + </property> + + <property> + <name>datanucleus.schema.autoCreateAll</name> + <value>true</value> + </property> + +</configuration> diff --git a/flink-connectors/flink-connector-hive/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-hive/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..fcd8654 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/resources/log4j-test.properties @@ -0,0 +1,24 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target=System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index 92058fa..e6d601d 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -49,6 +49,7 @@ under the License. <module>flink-connector-elasticsearch2</module> <module>flink-connector-elasticsearch5</module> <module>flink-connector-elasticsearch6</module> + <module>flink-connector-hive</module> <module>flink-connector-rabbitmq</module> <module>flink-connector-twitter</module> <module>flink-connector-nifi</module> diff --git a/flink-table/flink-table-api-java/pom.xml b/flink-table/flink-table-api-java/pom.xml index a6bf656..f8a8fd7 100644 --- a/flink-table/flink-table-api-java/pom.xml +++ b/flink-table/flink-table-api-java/pom.xml @@ -42,5 +42,15 @@ under the License. <artifactId>flink-table-common</artifactId> <version>${project.version}</version> </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java index 5f2c732..959247a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java @@ -38,7 +38,7 @@ public class GenericCatalogDatabase implements CatalogDatabase { public GenericCatalogDatabase(Map<String, String> properties, String comment) { this(properties); - this.comment = comment; + this.comment = checkNotNull(comment, "comment cannot be null"); } public Map<String, String> getProperties() { diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java index d96d8b5..50d203a 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java @@ -21,8 +21,6 @@ package org.apache.flink.table.catalog; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; @@ -35,10 +33,8 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.table.functions.ScalarFunction; import org.junit.After; -import org.junit.Before; -import org.junit.Rule; +import org.junit.BeforeClass; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.util.Arrays; import java.util.HashMap; @@ -55,37 +51,14 @@ import static org.junit.Assert.assertTrue; /** * Test for GenericInMemoryCatalog. */ -public class GenericInMemoryCatalogTest { - private static final String IS_STREAMING = "is_streaming"; - - private final String testCatalogName = "test-catalog"; - private final String db1 = "db1"; - private final String db2 = "db2"; - private final String nonExistantDatabase = "non-existant-db"; - - private final String t1 = "t1"; - private final String t2 = "t2"; - private final ObjectPath path1 = new ObjectPath(db1, t1); - private final ObjectPath path2 = new ObjectPath(db2, t2); - private final ObjectPath path3 = new ObjectPath(db1, t2); - private final ObjectPath path4 = new ObjectPath(db1, "t3"); - private final ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist"); - private final ObjectPath nonExistObjectPath = ObjectPath.fromString("db1.nonexist"); - - private static final String TEST_COMMENT = "test comment"; - private static final String TABLE_COMMENT = "This is my batch table"; - - private static ReadableWritableCatalog catalog; - - @Before - public void setUp() { - catalog = new GenericInMemoryCatalog(testCatalogName); +public class GenericInMemoryCatalogTest extends CatalogTestBase { + + @BeforeClass + public static void init() { + catalog = new GenericInMemoryCatalog(TEST_CATALOG_NAME); catalog.open(); } - @Rule - public ExpectedException exception = ExpectedException.none(); - @After public void close() throws Exception { if (catalog.tableExists(path1)) { @@ -100,13 +73,6 @@ public class GenericInMemoryCatalogTest { if (catalog.functionExists(path1)) { catalog.dropFunction(path1, true); } - if (catalog.databaseExists(db1)) { - catalog.dropDatabase(db1, true); - } - if (catalog.databaseExists(db2)) { - catalog.dropDatabase(db2, true); - } - catalog.close(); } // ------ tables ------ @@ -486,136 +452,6 @@ public class GenericInMemoryCatalogTest { assertEquals(Arrays.asList(path1.getObjectName()), catalog.listViews(db1)); } - // ------ databases ------ - - @Test - public void testCreateDb() throws Exception { - catalog.createDatabase(db2, createDb(), false); - - assertEquals(2, catalog.listDatabases().size()); - } - - @Test - public void testSetCurrentDatabase() throws Exception { - assertEquals(GenericInMemoryCatalog.DEFAULT_DB, catalog.getCurrentDatabase()); - catalog.createDatabase(db2, createDb(), true); - catalog.setCurrentDatabase(db2); - assertEquals(db2, catalog.getCurrentDatabase()); - catalog.setCurrentDatabase(GenericInMemoryCatalog.DEFAULT_DB); - assertEquals(GenericInMemoryCatalog.DEFAULT_DB, catalog.getCurrentDatabase()); - catalog.dropDatabase(db2, false); - } - - @Test - public void testSetCurrentDatabaseNegative() throws Exception { - exception.expect(DatabaseNotExistException.class); - exception.expectMessage("Database " + this.nonExistantDatabase + " does not exist in Catalog"); - catalog.setCurrentDatabase(this.nonExistantDatabase); - } - - @Test - public void testCreateDb_DatabaseAlreadyExistException() throws Exception { - catalog.createDatabase(db1, createDb(), false); - - exception.expect(DatabaseAlreadyExistException.class); - exception.expectMessage("Database db1 already exists in Catalog"); - catalog.createDatabase(db1, createDb(), false); - } - - @Test - public void testCreateDb_DatabaseAlreadyExist_ignored() throws Exception { - CatalogDatabase cd1 = createDb(); - catalog.createDatabase(db1, cd1, false); - List<String> dbs = catalog.listDatabases(); - - assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(cd1.getProperties().entrySet())); - assertEquals(2, dbs.size()); - assertEquals(new HashSet<>(Arrays.asList(db1, catalog.getCurrentDatabase())), new HashSet<>(dbs)); - - catalog.createDatabase(db1, createAnotherDb(), true); - - assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(cd1.getProperties().entrySet())); - assertEquals(2, dbs.size()); - assertEquals(new HashSet<>(Arrays.asList(db1, catalog.getCurrentDatabase())), new HashSet<>(dbs)); - } - - @Test - public void testGetDb_DatabaseNotExistException() throws Exception { - exception.expect(DatabaseNotExistException.class); - exception.expectMessage("Database nonexistent does not exist in Catalog"); - catalog.getDatabase("nonexistent"); - } - - @Test - public void testDropDb() throws Exception { - catalog.createDatabase(db1, createDb(), false); - - assertTrue(catalog.listDatabases().contains(db1)); - - catalog.dropDatabase(db1, false); - - assertFalse(catalog.listDatabases().contains(db1)); - } - - @Test - public void testDropDb_DatabaseNotExistException() throws Exception { - exception.expect(DatabaseNotExistException.class); - exception.expectMessage("Database db1 does not exist in Catalog"); - catalog.dropDatabase(db1, false); - } - - @Test - public void testDropDb_DatabaseNotExist_Ignore() throws Exception { - catalog.dropDatabase(db1, true); - } - - @Test - public void testDropDb_databaseIsNotEmpty() throws Exception { - catalog.createDatabase(db1, createDb(), false); - catalog.createTable(path1, createTable(), false); - - exception.expect(DatabaseNotEmptyException.class); - exception.expectMessage("Database db1 in Catalog test-catalog is not empty"); - catalog.dropDatabase(db1, true); - } - - @Test - public void testAlterDb() throws Exception { - CatalogDatabase db = createDb(); - catalog.createDatabase(db1, db, false); - - assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(db.getProperties().entrySet())); - - CatalogDatabase newDb = createAnotherDb(); - catalog.alterDatabase(db1, newDb, false); - - assertFalse(catalog.getDatabase(db1).getProperties().entrySet().containsAll(db.getProperties().entrySet())); - assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(newDb.getProperties().entrySet())); - } - - @Test - public void testAlterDb_DatabaseNotExistException() throws Exception { - exception.expect(DatabaseNotExistException.class); - exception.expectMessage("Database nonexistent does not exist in Catalog"); - catalog.alterDatabase("nonexistent", createDb(), false); - } - - @Test - public void testAlterDb_DatabaseNotExist_ignored() throws Exception { - catalog.alterDatabase("nonexistent", createDb(), true); - - assertFalse(catalog.databaseExists("nonexistent")); - } - - @Test - public void testDbExists() throws Exception { - assertFalse(catalog.databaseExists("nonexistent")); - - catalog.createDatabase(db1, createDb(), false); - - assertTrue(catalog.databaseExists(db1)); - } - @Test public void testRenameView() throws Exception { catalog.createDatabase("db1", new GenericCatalogDatabase(new HashMap<>()), false); @@ -660,7 +496,7 @@ public class GenericInMemoryCatalogTest { exception.expect(PartitionSpecInvalidException.class); exception.expectMessage( String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s", - invalid, table.getPartitionKeys(), path1.getFullName(), testCatalogName)); + invalid, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME)); catalog.listPartitions(path1, invalid); } @@ -670,7 +506,7 @@ public class GenericInMemoryCatalogTest { exception.expect(TableNotExistException.class); exception.expectMessage( - String.format("Table (or view) %s does not exist in Catalog %s.", path1.getFullName(), testCatalogName)); + String.format("Table (or view) %s does not exist in Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME)); catalog.createPartition(path1, createPartitionSpec(), createPartition(), false); } @@ -681,7 +517,7 @@ public class GenericInMemoryCatalogTest { exception.expect(TableNotPartitionedException.class); exception.expectMessage( - String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), testCatalogName)); + String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), TEST_CATALOG_NAME)); catalog.createPartition(path1, createPartitionSpec(), createPartition(), false); } @@ -695,7 +531,7 @@ public class GenericInMemoryCatalogTest { exception.expect(PartitionSpecInvalidException.class); exception.expectMessage( String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.", - partitionSpec, table.getPartitionKeys(), path1.getFullName(), testCatalogName)); + partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME)); catalog.createPartition(path1, partitionSpec, createPartition(), false); } @@ -711,7 +547,7 @@ public class GenericInMemoryCatalogTest { exception.expect(PartitionAlreadyExistsException.class); exception.expectMessage( String.format("Partition %s of table %s in catalog %s already exists.", - partitionSpec, path1.getFullName(), testCatalogName)); + partitionSpec, path1.getFullName(), TEST_CATALOG_NAME)); catalog.createPartition(path1, partitionSpec, createPartition(), false); } @@ -744,7 +580,7 @@ public class GenericInMemoryCatalogTest { exception.expect(TableNotExistException.class); exception.expectMessage( - String.format("Table (or view) %s does not exist in Catalog %s.", path1.getFullName(), testCatalogName)); + String.format("Table (or view) %s does not exist in Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME)); catalog.dropPartition(path1, createPartitionSpec(), false); } @@ -755,7 +591,7 @@ public class GenericInMemoryCatalogTest { exception.expect(TableNotPartitionedException.class); exception.expectMessage( - String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), testCatalogName)); + String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), TEST_CATALOG_NAME)); catalog.dropPartition(path1, createPartitionSpec(), false); } @@ -769,7 +605,7 @@ public class GenericInMemoryCatalogTest { exception.expect(PartitionSpecInvalidException.class); exception.expectMessage( String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.", - partitionSpec, table.getPartitionKeys(), path1.getFullName(), testCatalogName)); + partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME)); catalog.dropPartition(path1, partitionSpec, false); } @@ -781,7 +617,7 @@ public class GenericInMemoryCatalogTest { CatalogPartitionSpec partitionSpec = createPartitionSpec(); exception.expect(PartitionNotExistException.class); exception.expectMessage( - String.format("Partition %s of table %s in catalog %s does not exist.", partitionSpec, path1.getFullName(), testCatalogName)); + String.format("Partition %s of table %s in catalog %s does not exist.", partitionSpec, path1.getFullName(), TEST_CATALOG_NAME)); catalog.dropPartition(path1, partitionSpec, false); } @@ -828,7 +664,7 @@ public class GenericInMemoryCatalogTest { CatalogPartitionSpec partitionSpec = createPartitionSpec(); exception.expect(TableNotExistException.class); exception.expectMessage( - String.format("Table (or view) %s does not exist in Catalog %s.", path1.getFullName(), testCatalogName)); + String.format("Table (or view) %s does not exist in Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME)); catalog.alterPartition(path1, partitionSpec, createPartition(), false); } @@ -840,7 +676,7 @@ public class GenericInMemoryCatalogTest { CatalogPartitionSpec partitionSpec = createPartitionSpec(); exception.expect(TableNotPartitionedException.class); exception.expectMessage( - String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), testCatalogName)); + String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), TEST_CATALOG_NAME)); catalog.alterPartition(path1, partitionSpec, createPartition(), false); } @@ -854,7 +690,7 @@ public class GenericInMemoryCatalogTest { exception.expect(PartitionSpecInvalidException.class); exception.expectMessage( String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.", - partitionSpec, table.getPartitionKeys(), path1.getFullName(), testCatalogName)); + partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME)); catalog.alterPartition(path1, partitionSpec, createPartition(), false); } @@ -868,7 +704,7 @@ public class GenericInMemoryCatalogTest { exception.expect(PartitionNotExistException.class); exception.expectMessage( String.format("Partition %s of table %s in catalog %s does not exist.", - partitionSpec, path1.getFullName(), testCatalogName)); + partitionSpec, path1.getFullName(), TEST_CATALOG_NAME)); catalog.alterPartition(path1, partitionSpec, catalogPartition, false); } @@ -892,7 +728,7 @@ public class GenericInMemoryCatalogTest { exception.expect(TableNotPartitionedException.class); exception.expectMessage( - String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), testCatalogName)); + String.format("Table %s in catalog %s is not partitioned.", path1.getFullName(), TEST_CATALOG_NAME)); catalog.getPartition(path1, createPartitionSpec()); } @@ -906,7 +742,7 @@ public class GenericInMemoryCatalogTest { exception.expect(PartitionSpecInvalidException.class); exception.expectMessage( String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.", - partitionSpec, table.getPartitionKeys(), path1.getFullName(), testCatalogName)); + partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME)); catalog.getPartition(path1, partitionSpec); } @@ -924,7 +760,7 @@ public class GenericInMemoryCatalogTest { exception.expect(PartitionSpecInvalidException.class); exception.expectMessage( String.format("PartitionSpec %s does not match partition keys %s of table %s in catalog %s.", - partitionSpec, table.getPartitionKeys(), path1.getFullName(), testCatalogName)); + partitionSpec, table.getPartitionKeys(), path1.getFullName(), TEST_CATALOG_NAME)); catalog.getPartition(path1, partitionSpec); } @@ -937,7 +773,7 @@ public class GenericInMemoryCatalogTest { exception.expect(PartitionNotExistException.class); exception.expectMessage( String.format("Partition %s of table %s in catalog %s does not exist.", - partitionSpec, path1.getFullName(), testCatalogName)); + partitionSpec, path1.getFullName(), TEST_CATALOG_NAME)); catalog.getPartition(path1, partitionSpec); } @@ -1106,13 +942,33 @@ public class GenericInMemoryCatalogTest { // ------ utilities ------ + @Override + public String getBuiltInDefaultDatabase() { + return GenericInMemoryCatalog.DEFAULT_DB; + } + + @Override + public CatalogDatabase createDb() { + return new GenericCatalogDatabase(new HashMap<String, String>() {{ + put("k1", "v1"); + }}, TEST_COMMENT); + } + + @Override + public CatalogDatabase createAnotherDb() { + return new GenericCatalogDatabase(new HashMap<String, String>() {{ + put("k2", "v2"); + }}, "this is another database."); + } + private GenericCatalogTable createStreamingTable() { return CatalogTestUtil.createTable( createTableSchema(), getStreamingTableProperties(), TABLE_COMMENT); } - private GenericCatalogTable createTable() { + @Override + public GenericCatalogTable createTable() { return CatalogTestUtil.createTable( createTableSchema(), getBatchTableProperties(), TABLE_COMMENT); @@ -1186,12 +1042,6 @@ public class GenericInMemoryCatalogTest { return new GenericCatalogPartition(props); } - private CatalogDatabase createDb() { - return new GenericCatalogDatabase(new HashMap<String, String>() {{ - put("k1", "v1"); - }}, TEST_COMMENT); - } - private Map<String, String> getBatchTableProperties() { return new HashMap<String, String>() {{ put(IS_STREAMING, "false"); @@ -1204,12 +1054,6 @@ public class GenericInMemoryCatalogTest { }}; } - private CatalogDatabase createAnotherDb() { - return new GenericCatalogDatabase(new HashMap<String, String>() {{ - put("k2", "v2"); - }}, "this is another database."); - } - private TableSchema createTableSchema() { return new TableSchema( new String[] {"first", "second", "third"}, @@ -1235,7 +1079,7 @@ public class GenericInMemoryCatalogTest { private CatalogView createView() { return new GenericCatalogView( String.format("select * from %s", t1), - String.format("select * from %s.%s", testCatalogName, path1.getFullName()), + String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()), createTableSchema(), new HashMap<>(), "This is a view"); @@ -1244,7 +1088,7 @@ public class GenericInMemoryCatalogTest { private CatalogView createAnotherView() { return new GenericCatalogView( String.format("select * from %s", t2), - String.format("select * from %s.%s", testCatalogName, path2.getFullName()), + String.format("select * from %s.%s", TEST_CATALOG_NAME, path2.getFullName()), createTableSchema(), new HashMap<>(), "This is another view"); diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java new file mode 100644 index 0000000..83a8f2b --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Base testing class for unit tests of a specific catalog, like GenericInMemoryCatalog and HiveCatalog. + */ +public abstract class CatalogTestBase { + protected static final String IS_STREAMING = "is_streaming"; + + protected final String db1 = "db1"; + protected final String db2 = "db2"; + protected final String nonExistentDatabase = "non-existent-db"; + + protected final String t1 = "t1"; + protected final String t2 = "t2"; + protected final ObjectPath path1 = new ObjectPath(db1, t1); + protected final ObjectPath path2 = new ObjectPath(db2, t2); + protected final ObjectPath path3 = new ObjectPath(db1, t2); + protected final ObjectPath path4 = new ObjectPath(db1, "t3"); + protected final ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist"); + protected final ObjectPath nonExistObjectPath = ObjectPath.fromString("db1.nonexist"); + + protected static final String TEST_CATALOG_NAME = "test-catalog"; + protected static final String TEST_COMMENT = "test comment"; + protected static final String TABLE_COMMENT = "This is my batch table"; + + protected static ReadableWritableCatalog catalog; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @After + public void cleanup() throws Exception { + if (catalog.databaseExists(db1)) { + catalog.dropDatabase(db1, true); + } + if (catalog.databaseExists(db2)) { + catalog.dropDatabase(db2, true); + } + } + + @AfterClass + public static void closeup() { + catalog.close(); + } + + // ------ databases ------ + + @Test + public void testCreateDb() throws Exception { + catalog.createDatabase(db2, createDb(), false); + + assertEquals(2, catalog.listDatabases().size()); + } + + @Test + public void testSetCurrentDatabase() throws Exception { + assertEquals(getBuiltInDefaultDatabase(), catalog.getCurrentDatabase()); + catalog.createDatabase(db2, createDb(), true); + catalog.setCurrentDatabase(db2); + assertEquals(db2, catalog.getCurrentDatabase()); + catalog.setCurrentDatabase(getBuiltInDefaultDatabase()); + assertEquals(getBuiltInDefaultDatabase(), catalog.getCurrentDatabase()); + catalog.dropDatabase(db2, false); + } + + @Test + public void testSetCurrentDatabaseNegative() throws Exception { + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database " + this.nonExistentDatabase + " does not exist in Catalog"); + catalog.setCurrentDatabase(this.nonExistentDatabase); + } + + @Test + public void testCreateDb_DatabaseAlreadyExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + exception.expect(DatabaseAlreadyExistException.class); + exception.expectMessage("Database db1 already exists in Catalog"); + catalog.createDatabase(db1, createDb(), false); + } + + @Test + public void testCreateDb_DatabaseAlreadyExist_ignored() throws Exception { + CatalogDatabase cd1 = createDb(); + catalog.createDatabase(db1, cd1, false); + List<String> dbs = catalog.listDatabases(); + + assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(cd1.getProperties().entrySet())); + assertEquals(2, dbs.size()); + assertEquals(new HashSet<>(Arrays.asList(db1, catalog.getCurrentDatabase())), new HashSet<>(dbs)); + + catalog.createDatabase(db1, createAnotherDb(), true); + + assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(cd1.getProperties().entrySet())); + assertEquals(2, dbs.size()); + assertEquals(new HashSet<>(Arrays.asList(db1, catalog.getCurrentDatabase())), new HashSet<>(dbs)); + } + + @Test + public void testGetDb_DatabaseNotExistException() throws Exception { + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database nonexistent does not exist in Catalog"); + catalog.getDatabase("nonexistent"); + } + + @Test + public void testDropDb() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertTrue(catalog.listDatabases().contains(db1)); + + catalog.dropDatabase(db1, false); + + assertFalse(catalog.listDatabases().contains(db1)); + } + + @Test + public void testDropDb_DatabaseNotExistException() throws Exception { + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.dropDatabase(db1, false); + } + + @Test + public void testDropDb_DatabaseNotExist_Ignore() throws Exception { + catalog.dropDatabase(db1, true); + } + + @Test + public void testDropDb_DatabaseNotEmptyException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createTable(path1, createTable(), false); + + exception.expect(DatabaseNotEmptyException.class); + exception.expectMessage("Database db1 in Catalog test-catalog is not empty"); + catalog.dropDatabase(db1, true); + } + + @Test + public void testAlterDb() throws Exception { + CatalogDatabase db = createDb(); + catalog.createDatabase(db1, db, false); + + assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(db.getProperties().entrySet())); + + CatalogDatabase newDb = createAnotherDb(); + catalog.alterDatabase(db1, newDb, false); + + assertFalse(catalog.getDatabase(db1).getProperties().entrySet().containsAll(db.getProperties().entrySet())); + assertTrue(catalog.getDatabase(db1).getProperties().entrySet().containsAll(newDb.getProperties().entrySet())); + } + + @Test + public void testAlterDb_DatabaseNotExistException() throws Exception { + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database nonexistent does not exist in Catalog"); + catalog.alterDatabase("nonexistent", createDb(), false); + } + + @Test + public void testAlterDb_DatabaseNotExist_ignored() throws Exception { + catalog.alterDatabase("nonexistent", createDb(), true); + + assertFalse(catalog.databaseExists("nonexistent")); + } + + @Test + public void testDbExists() throws Exception { + assertFalse(catalog.databaseExists("nonexistent")); + + catalog.createDatabase(db1, createDb(), false); + + assertTrue(catalog.databaseExists(db1)); + } + + // ------ utilities ------ + + /** + * Get the built-in default database of the specific catalog implementation. + * + * @return The built-in default database name + */ + public abstract String getBuiltInDefaultDatabase(); + + /** + * Create a CatalogDatabase instance by specific catalog implementation. + * + * @return a CatalogDatabase instance + */ + public abstract CatalogDatabase createDb(); + + /** + * Create another CatalogDatabase instance by specific catalog implementation. + * + * @return another CatalogDatabase instance + */ + public abstract CatalogDatabase createAnotherDb(); + + /** + * Create a CatalogTable instance by specific catalog implementation. + * + * @return a CatalogTable instance + */ + public abstract CatalogTable createTable(); +}