This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 1104f95 HUDI-180 : Adding support for hive registration using metastore along with JDBC 1104f95 is described below commit 1104f9526fcd05270ed5cd084228f023f1a01c46 Author: Nishith Agarwal <nagar...@uber.com> AuthorDate: Fri Aug 30 12:30:56 2019 -0700 HUDI-180 : Adding support for hive registration using metastore along with JDBC --- .../java/org/apache/hudi/hive/HiveSyncConfig.java | 40 ++--- .../org/apache/hudi/hive/HoodieHiveClient.java | 161 ++++++++++++++++----- .../org/apache/hudi/hive/HiveSyncToolTest.java | 23 +++ 3 files changed, 173 insertions(+), 51 deletions(-) diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 8296163..2fc3a2e 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -48,11 +48,10 @@ public class HiveSyncConfig implements Serializable { "--base-path"}, description = "Basepath of hoodie dataset to sync", required = true) public String basePath; - @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by", - required = false) + @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by") public List<String> partitionFields = new ArrayList<>(); - @Parameter(names = "-partition-value-extractor", description = "Class which implements " + @Parameter(names = "--partition-value-extractor", description = "Class which implements " + "PartitionValueExtractor " + "to extract the partition " + "values from HDFS path") @@ -74,9 +73,27 @@ public class HiveSyncConfig implements Serializable { + "org.apache.hudi input format.") public Boolean usePreApacheInputFormat = false; + @Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url") + public Boolean useJdbc = true; + @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; + public static HiveSyncConfig copy(HiveSyncConfig cfg) { + HiveSyncConfig newConfig = new HiveSyncConfig(); + newConfig.basePath = cfg.basePath; + newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning; + newConfig.databaseName = cfg.databaseName; + newConfig.hivePass = cfg.hivePass; + newConfig.hiveUser = cfg.hiveUser; + newConfig.partitionFields = cfg.partitionFields; + newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass; + newConfig.jdbcUrl = cfg.jdbcUrl; + newConfig.tableName = cfg.tableName; + newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat; + return newConfig; + } + @Override public String toString() { return "HiveSyncConfig{" @@ -89,22 +106,9 @@ public class HiveSyncConfig implements Serializable { + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning + + ", usePreApacheInputFormat=" + usePreApacheInputFormat + + ", useJdbc=" + useJdbc + ", help=" + help + '}'; } - - public static HiveSyncConfig copy(HiveSyncConfig cfg) { - HiveSyncConfig newConfig = new HiveSyncConfig(); - newConfig.basePath = cfg.basePath; - newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning; - newConfig.databaseName = cfg.databaseName; - newConfig.hivePass = cfg.hivePass; - newConfig.hiveUser = cfg.hiveUser; - newConfig.partitionFields = cfg.partitionFields; - newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass; - newConfig.jdbcUrl = cfg.jdbcUrl; - newConfig.tableName = cfg.tableName; - newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat; - return newConfig; - } } diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 533c29b..771f45a 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -29,7 +29,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -40,6 +42,8 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.jdbc.HiveDriver; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -86,6 +90,7 @@ public class HoodieHiveClient { private FileSystem fs; private Connection connection; private HoodieTimeline activeTimeline; + private HiveConf configuration; public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { this.syncConfig = cfg; @@ -93,8 +98,13 @@ public class HoodieHiveClient { this.metaClient = new HoodieTableMetaClient(fs.getConf(), cfg.basePath, true); this.tableType = metaClient.getTableType(); - LOG.info("Creating hive connection " + cfg.jdbcUrl); - createHiveConnection(); + this.configuration = configuration; + // Support both JDBC and metastore based implementations for backwards compatiblity. Future users should + // disable jdbc and depend on metastore client for all hive registrations + if (cfg.useJdbc) { + LOG.info("Creating hive connection " + cfg.jdbcUrl); + createHiveConnection(); + } try { this.client = new HiveMetaStoreClient(configuration); } catch (MetaException e) { @@ -269,32 +279,59 @@ public class HoodieHiveClient { * Get the table schema */ public Map<String, String> getTableSchema() { - if (!doesTableExist()) { - throw new IllegalArgumentException( - "Failed to get schema for table " + syncConfig.tableName + " does not exist"); - } - Map<String, String> schema = Maps.newHashMap(); - ResultSet result = null; - try { - DatabaseMetaData databaseMetaData = connection.getMetaData(); - result = databaseMetaData - .getColumns(null, syncConfig.databaseName, syncConfig.tableName, null); - while (result.next()) { - String columnName = result.getString(4); - String columnType = result.getString(6); - if ("DECIMAL".equals(columnType)) { - int columnSize = result.getInt("COLUMN_SIZE"); - int decimalDigits = result.getInt("DECIMAL_DIGITS"); - columnType += String.format("(%s,%s)", columnSize, decimalDigits); + if (syncConfig.useJdbc) { + if (!doesTableExist()) { + throw new IllegalArgumentException( + "Failed to get schema for table " + syncConfig.tableName + " does not exist"); + } + Map<String, String> schema = Maps.newHashMap(); + ResultSet result = null; + try { + DatabaseMetaData databaseMetaData = connection.getMetaData(); + result = databaseMetaData + .getColumns(null, syncConfig.databaseName, syncConfig.tableName, null); + while (result.next()) { + String columnName = result.getString(4); + String columnType = result.getString(6); + if ("DECIMAL".equals(columnType)) { + int columnSize = result.getInt("COLUMN_SIZE"); + int decimalDigits = result.getInt("DECIMAL_DIGITS"); + columnType += String.format("(%s,%s)", columnSize, decimalDigits); + } + schema.put(columnName, columnType); } - schema.put(columnName, columnType); + return schema; + } catch (SQLException e) { + throw new HoodieHiveSyncException("Failed to get table schema for " + syncConfig.tableName, + e); + } finally { + closeQuietly(result, null); } + } else { + return getTableSchemaUsingMetastoreClient(); + } + } + + public Map<String, String> getTableSchemaUsingMetastoreClient() { + try { + // HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to + // get the Schema of the table. + final long start = System.currentTimeMillis(); + Table table = this.client.getTable(syncConfig.databaseName, syncConfig.tableName); + Map<String, String> partitionKeysMap = table.getPartitionKeys().stream() + .collect(Collectors.toMap(f -> f.getName(), f -> f.getType().toUpperCase())); + + Map<String, String> columnsMap = table.getSd().getCols().stream() + .collect(Collectors.toMap(f -> f.getName(), f -> f.getType().toUpperCase())); + + Map<String, String> schema = new HashMap<>(); + schema.putAll(columnsMap); + schema.putAll(partitionKeysMap); + final long end = System.currentTimeMillis(); + LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start))); return schema; - } catch (SQLException e) { - throw new HoodieHiveSyncException("Failed to get table schema for " + syncConfig.tableName, - e); - } finally { - closeQuietly(result, null); + } catch (Exception e) { + throw new HoodieHiveSyncException("Failed to get table schema for : " + syncConfig.tableName, e); } } @@ -455,19 +492,71 @@ public class HoodieHiveClient { * @param s SQL to execute */ public void updateHiveSQL(String s) { - Statement stmt = null; + if (syncConfig.useJdbc) { + Statement stmt = null; + try { + stmt = connection.createStatement(); + LOG.info("Executing SQL " + s); + stmt.execute(s); + } catch (SQLException e) { + throw new HoodieHiveSyncException("Failed in executing SQL " + s, e); + } finally { + closeQuietly(null, stmt); + } + } else { + updateHiveSQLUsingHiveDriver(s); + } + } + + /** + * Execute a update in hive using Hive Driver + * + * @param sql SQL statement to execute + */ + public CommandProcessorResponse updateHiveSQLUsingHiveDriver(String sql) throws HoodieHiveSyncException { + List<CommandProcessorResponse> responses = updateHiveSQLs(Arrays.asList(sql)); + return responses.get(responses.size() - 1); + } + + private List<CommandProcessorResponse> updateHiveSQLs(List<String> sqls) throws HoodieHiveSyncException { + SessionState ss = null; + org.apache.hadoop.hive.ql.Driver hiveDriver = null; + List<CommandProcessorResponse> responses = new ArrayList<>(); try { - stmt = connection.createStatement(); - LOG.info("Executing SQL " + s); - stmt.execute(s); - } catch (SQLException e) { - throw new HoodieHiveSyncException("Failed in executing SQL " + s, e); + final long startTime = System.currentTimeMillis(); + ss = SessionState.start(configuration); + hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration); + final long endTime = System.currentTimeMillis(); + LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime))); + for (String sql : sqls) { + final long start = System.currentTimeMillis(); + responses.add(hiveDriver.run(sql)); + final long end = System.currentTimeMillis(); + LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, (end - start))); + } + } catch (Exception e) { + throw new HoodieHiveSyncException("Failed in executing SQL", e); } finally { - closeQuietly(null, stmt); + if (ss != null) { + try { + ss.close(); + } catch (IOException ie) { + LOG.error("Error while closing SessionState", ie); + } + } + if (hiveDriver != null) { + try { + hiveDriver.close(); + } catch (Exception e) { + LOG.error("Error while closing hiveDriver", e); + } + } } + return responses; } + private void createHiveConnection() { if (connection == null) { try { @@ -505,6 +594,11 @@ public class HoodieHiveClient { if (stmt != null) { stmt.close(); } + } catch (SQLException e) { + LOG.error("Could not close the statement opened ", e); + } + + try { if (resultSet != null) { resultSet.close(); } @@ -544,6 +638,7 @@ public class HoodieHiveClient { } if (client != null) { client.close(); + client = null; } } catch (SQLException e) { LOG.error("Could not close connection ", e); @@ -622,4 +717,4 @@ public class HoodieHiveClient { return new PartitionEvent(PartitionEventType.UPDATE, storagePartition); } } -} +} \ No newline at end of file diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/HiveSyncToolTest.java b/hudi-hive/src/test/java/org/apache/hudi/hive/HiveSyncToolTest.java index 2947d8b..c1ef756 100644 --- a/hudi-hive/src/test/java/org/apache/hudi/hive/HiveSyncToolTest.java +++ b/hudi-hive/src/test/java/org/apache/hudi/hive/HiveSyncToolTest.java @@ -25,6 +25,8 @@ import static org.junit.Assert.assertTrue; import com.google.common.collect.Lists; import java.io.IOException; import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hudi.common.util.Option; @@ -39,10 +41,25 @@ import org.apache.parquet.schema.Types; import org.joda.time.DateTime; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; @SuppressWarnings("ConstantConditions") +@RunWith(Parameterized.class) public class HiveSyncToolTest { + // Test sync tool using both jdbc and metastore client + private boolean useJdbc; + + public HiveSyncToolTest(Boolean useJdbc) { + this.useJdbc = useJdbc; + } + + @Parameterized.Parameters(name = "UseJdbc") + public static Collection<Boolean[]> data() { + return Arrays.asList(new Boolean[][]{{false}, {true}}); + } + @Before public void setUp() throws IOException, InterruptedException, URISyntaxException { TestUtil.setUp(); @@ -146,6 +163,7 @@ public class HiveSyncToolTest { @Test public void testBasicSync() throws Exception { + TestUtil.hiveSyncConfig.useJdbc = this.useJdbc; String commitTime = "100"; TestUtil.createCOWDataset(commitTime, 5); HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, @@ -168,6 +186,7 @@ public class HiveSyncToolTest { @Test public void testSyncIncremental() throws Exception { + TestUtil.hiveSyncConfig.useJdbc = this.useJdbc; String commitTime1 = "100"; TestUtil.createCOWDataset(commitTime1, 5); HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, @@ -211,6 +230,7 @@ public class HiveSyncToolTest { @Test public void testSyncIncrementalWithSchemaEvolution() throws Exception { + TestUtil.hiveSyncConfig.useJdbc = this.useJdbc; String commitTime1 = "100"; TestUtil.createCOWDataset(commitTime1, 5); HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, @@ -247,6 +267,7 @@ public class HiveSyncToolTest { @Test public void testSyncMergeOnRead() throws Exception { + TestUtil.hiveSyncConfig.useJdbc = this.useJdbc; String commitTime = "100"; String deltaCommitTime = "101"; TestUtil.createMORDataset(commitTime, deltaCommitTime, 5); @@ -295,6 +316,7 @@ public class HiveSyncToolTest { @Test public void testSyncMergeOnReadRT() throws Exception { + TestUtil.hiveSyncConfig.useJdbc = this.useJdbc; String commitTime = "100"; String deltaCommitTime = "101"; String roTablename = TestUtil.hiveSyncConfig.tableName; @@ -350,6 +372,7 @@ public class HiveSyncToolTest { @Test public void testMultiPartitionKeySync() throws Exception { + TestUtil.hiveSyncConfig.useJdbc = this.useJdbc; String commitTime = "100"; TestUtil.createCOWDataset(commitTime, 5);