This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch redo-log in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/redo-log by this push: new ac105e6 [HUDI-459] Redo hudi-hive log statements using SLF4J (#1203) ac105e6 is described below commit ac105e6d9b6dcd75f8145af6ce03600af40180e0 Author: lamber-ken <lamber...@163.com> AuthorDate: Fri Jan 10 09:38:34 2020 +0800 [HUDI-459] Redo hudi-hive log statements using SLF4J (#1203) --- hudi-hive/pom.xml | 5 +++ .../java/org/apache/hudi/hive/HiveSyncTool.java | 30 +++++++-------- .../org/apache/hudi/hive/HoodieHiveClient.java | 44 +++++++++++----------- .../java/org/apache/hudi/hive/util/SchemaUtil.java | 12 +++--- .../org/apache/hudi/hive/util/HiveTestService.java | 10 ++--- 5 files changed, 53 insertions(+), 48 deletions(-) diff --git a/hudi-hive/pom.xml b/hudi-hive/pom.xml index c552b70..1ab2533 100644 --- a/hudi-hive/pom.xml +++ b/hudi-hive/pom.xml @@ -49,6 +49,11 @@ <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> <dependency> <groupId>org.apache.parquet</groupId> diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 6bcb697..4029096 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -34,8 +34,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.parquet.schema.MessageType; import java.util.List; @@ -52,7 +52,7 @@ import java.util.stream.Collectors; @SuppressWarnings("WeakerAccess") public class HiveSyncTool { - private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class); + private static final Logger LOG = LoggerFactory.getLogger(HiveSyncTool.class); private final HoodieHiveClient hoodieHiveClient; public static final String SUFFIX_REALTIME_TABLE = "_rt"; private final HiveSyncConfig cfg; @@ -79,7 +79,7 @@ public class HiveSyncTool { cfg.tableName = originalTableName; break; default: - LOG.error("Unknown table type " + hoodieHiveClient.getTableType()); + LOG.error("Unknown table type {}", hoodieHiveClient.getTableType()); throw new InvalidDatasetException(hoodieHiveClient.getBasePath()); } } catch (RuntimeException re) { @@ -90,8 +90,8 @@ public class HiveSyncTool { } private void syncHoodieTable(boolean isRealTime) throws ClassNotFoundException { - LOG.info("Trying to sync hoodie table " + cfg.tableName + " with base path " + hoodieHiveClient.getBasePath() - + " of type " + hoodieHiveClient.getTableType()); + LOG.info("Trying to sync hoodie table {} with base path {} of type {}", + cfg.tableName, hoodieHiveClient.getBasePath(), hoodieHiveClient.getTableType()); // Check if the necessary table exists boolean tableExists = hoodieHiveClient.doesTableExist(); @@ -100,20 +100,20 @@ public class HiveSyncTool { // Sync schema if needed syncSchema(tableExists, isRealTime, schema); - LOG.info("Schema sync complete. Syncing partitions for " + cfg.tableName); + LOG.info("Schema sync complete. Syncing partitions for {}", cfg.tableName); // Get the last time we successfully synced partitions Option<String> lastCommitTimeSynced = Option.empty(); if (tableExists) { lastCommitTimeSynced = hoodieHiveClient.getLastCommitTimeSynced(); } - LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null")); + LOG.info("Last commit time synced was found to be {}", lastCommitTimeSynced.orElse("null")); List<String> writtenPartitionsSince = hoodieHiveClient.getPartitionsWrittenToSince(lastCommitTimeSynced); - LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size()); + LOG.info("Storage partitions scan complete. Found {}", writtenPartitionsSince.size()); // Sync the partitions if needed syncPartitions(writtenPartitionsSince); hoodieHiveClient.updateLastCommitTimeSynced(); - LOG.info("Sync complete for " + cfg.tableName); + LOG.info("Sync complete for {}", cfg.tableName); } /** @@ -126,7 +126,7 @@ public class HiveSyncTool { private void syncSchema(boolean tableExists, boolean isRealTime, MessageType schema) throws ClassNotFoundException { // Check and sync schema if (!tableExists) { - LOG.info("Table " + cfg.tableName + " is not found. Creating it"); + LOG.info("Table {} is not found. Creating it", cfg.tableName); if (!isRealTime) { // TODO - RO Table for MOR only after major compaction (UnboundedCompaction is default // for now) @@ -150,10 +150,10 @@ public class HiveSyncTool { Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(); SchemaDifference schemaDiff = SchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields); if (!schemaDiff.isEmpty()) { - LOG.info("Schema difference found for " + cfg.tableName); + LOG.info("Schema difference found for {}", cfg.tableName); hoodieHiveClient.updateTableDefinition(schema); } else { - LOG.info("No Schema difference for " + cfg.tableName); + LOG.info("No Schema difference for {}", cfg.tableName); } } } @@ -168,10 +168,10 @@ public class HiveSyncTool { List<PartitionEvent> partitionEvents = hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD); - LOG.info("New Partitions " + newPartitions); + LOG.info("New Partitions {}", newPartitions); hoodieHiveClient.addPartitionsToTable(newPartitions); List<String> updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE); - LOG.info("Changed Partitions " + updatePartitions); + LOG.info("Changed Partitions {}", updatePartitions); hoodieHiveClient.updatePartitionsToTable(updatePartitions); } catch (Exception e) { throw new HoodieHiveSyncException("Failed to sync partitions for table " + cfg.tableName, e); diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index d176500..820e59b 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -48,8 +48,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.jdbc.HiveDriver; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -87,7 +87,7 @@ public class HoodieHiveClient { } } - private static final Logger LOG = LogManager.getLogger(HoodieHiveClient.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieHiveClient.class); private final HoodieTableMetaClient metaClient; private final HoodieTableType tableType; private final PartitionValueExtractor partitionValueExtractor; @@ -108,7 +108,7 @@ public class HoodieHiveClient { // Support both JDBC and metastore based implementations for backwards compatiblity. Future users should // disable jdbc and depend on metastore client for all hive registrations if (cfg.useJdbc) { - LOG.info("Creating hive connection " + cfg.jdbcUrl); + LOG.info("Creating hive connection {}", cfg.jdbcUrl); createHiveConnection(); } try { @@ -137,10 +137,10 @@ public class HoodieHiveClient { */ void addPartitionsToTable(List<String> partitionsToAdd) { if (partitionsToAdd.isEmpty()) { - LOG.info("No partitions to add for " + syncConfig.tableName); + LOG.info("No partitions to add for {}", syncConfig.tableName); return; } - LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + syncConfig.tableName); + LOG.info("Adding partitions {} to table {}", partitionsToAdd.size(), syncConfig.tableName); String sql = constructAddPartitions(partitionsToAdd); updateHiveSQL(sql); } @@ -150,10 +150,10 @@ public class HoodieHiveClient { */ void updatePartitionsToTable(List<String> changedPartitions) { if (changedPartitions.isEmpty()) { - LOG.info("No partitions to change for " + syncConfig.tableName); + LOG.info("No partitions to change for {}", syncConfig.tableName); return; } - LOG.info("Changing partitions " + changedPartitions.size() + " on " + syncConfig.tableName); + LOG.info("Changing partitions {} on {}", changedPartitions.size(), syncConfig.tableName); List<String> sqls = constructChangePartitions(changedPartitions); for (String sql : sqls) { updateHiveSQL(sql); @@ -260,7 +260,7 @@ public class HoodieHiveClient { .append(HIVE_ESCAPE_CHARACTER).append(syncConfig.tableName) .append(HIVE_ESCAPE_CHARACTER).append(" REPLACE COLUMNS(") .append(newSchemaStr).append(" )").append(cascadeClause); - LOG.info("Updating table definition with " + sqlBuilder); + LOG.info("Updating table definition with {}", sqlBuilder); updateHiveSQL(sqlBuilder.toString()); } catch (IOException e) { throw new HoodieHiveSyncException("Failed to update table for " + syncConfig.tableName, e); @@ -271,7 +271,7 @@ public class HoodieHiveClient { try { String createSQLQuery = SchemaUtil.generateCreateDDL(storageSchema, syncConfig, inputFormatClass, outputFormatClass, serdeClass); - LOG.info("Creating table with " + createSQLQuery); + LOG.info("Creating table with {}", createSQLQuery); updateHiveSQL(createSQLQuery); } catch (IOException e) { throw new HoodieHiveSyncException("Failed to create table " + syncConfig.tableName, e); @@ -329,7 +329,7 @@ public class HoodieHiveClient { schema.putAll(columnsMap); schema.putAll(partitionKeysMap); final long end = System.currentTimeMillis(); - LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start))); + LOG.info("Time taken to getTableSchema: {} ms", (end - start)); return schema; } catch (Exception e) { throw new HoodieHiveSyncException("Failed to get table schema for : " + syncConfig.tableName, e); @@ -364,7 +364,7 @@ public class HoodieHiveClient { // Get a datafile written and get the schema from that file Option<HoodieInstant> lastCompactionCommit = metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); - LOG.info("Found the last compaction commit as " + lastCompactionCommit); + LOG.info("Found the last compaction commit as {}", lastCompactionCommit); Option<HoodieInstant> lastDeltaCommit; if (lastCompactionCommit.isPresent()) { @@ -374,7 +374,7 @@ public class HoodieHiveClient { lastDeltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant(); } - LOG.info("Found the last delta commit " + lastDeltaCommit); + LOG.info("Found the last delta commit {}", lastDeltaCommit); if (lastDeltaCommit.isPresent()) { HoodieInstant lastDeltaInstant = lastDeltaCommit.get(); @@ -407,7 +407,7 @@ public class HoodieHiveClient { return readSchemaFromLastCompaction(lastCompactionCommit); } default: - LOG.error("Unknown table type " + tableType); + LOG.error("Unknown table type {}", tableType); throw new InvalidDatasetException(syncConfig.basePath); } } catch (IOException e) { @@ -441,7 +441,7 @@ public class HoodieHiveClient { MessageType messageType = SchemaUtil.readSchemaFromLogFile(fs, path); // Fall back to read the schema from last compaction if (messageType == null) { - LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt); + LOG.info("Falling back to read the schema from last compaction {}", lastCompactionCommitOpt); return readSchemaFromLastCompaction(lastCompactionCommitOpt); } return messageType; @@ -451,7 +451,7 @@ public class HoodieHiveClient { * Read the parquet schema from a parquet File. */ private MessageType readSchemaFromDataFile(Path parquetFilePath) throws IOException { - LOG.info("Reading schema from " + parquetFilePath); + LOG.info("Reading schema from {}", parquetFilePath); if (!fs.exists(parquetFilePath)) { throw new IllegalArgumentException( "Failed to read schema from data file " + parquetFilePath + ". File does not exist."); @@ -482,7 +482,7 @@ public class HoodieHiveClient { Statement stmt = null; try { stmt = connection.createStatement(); - LOG.info("Executing SQL " + s); + LOG.info("Executing SQL {}", s); stmt.execute(s); } catch (SQLException e) { throw new HoodieHiveSyncException("Failed in executing SQL " + s, e); @@ -513,12 +513,12 @@ public class HoodieHiveClient { ss = SessionState.start(configuration); hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration); final long endTime = System.currentTimeMillis(); - LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime))); + LOG.info("Time taken to start SessionState and create Driver: {} ms", (endTime - startTime)); for (String sql : sqls) { final long start = System.currentTimeMillis(); responses.add(hiveDriver.run(sql)); final long end = System.currentTimeMillis(); - LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, (end - start))); + LOG.info("Time taken to execute [{}]: {} ms", sql, (end - start)); } } catch (Exception e) { throw new HoodieHiveSyncException("Failed in executing SQL", e); @@ -552,7 +552,7 @@ public class HoodieHiveClient { try { this.connection = DriverManager.getConnection(syncConfig.jdbcUrl, syncConfig.hiveUser, syncConfig.hivePass); - LOG.info("Successfully established Hive connection to " + syncConfig.jdbcUrl); + LOG.info("Successfully established Hive connection to {}", syncConfig.jdbcUrl); } catch (SQLException e) { throw new HoodieHiveSyncException("Cannot create hive connection " + getHiveJdbcUrlWithDefaultDBName(), e); } @@ -630,14 +630,14 @@ public class HoodieHiveClient { @SuppressWarnings("OptionalUsedAsFieldOrParameterType") List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) { if (!lastCommitTimeSynced.isPresent()) { - LOG.info("Last commit time synced is not known, listing all partitions in " + syncConfig.basePath + ",FS :" + fs); + LOG.info("Last commit time synced is not known, listing all partitions in {}, FS :{}", syncConfig.basePath, fs); try { return FSUtils.getAllPartitionPaths(fs, syncConfig.basePath, syncConfig.assumeDatePartitioning); } catch (IOException e) { throw new HoodieIOException("Failed to list all partitions in " + syncConfig.basePath, e); } } else { - LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then"); + LOG.info("Last commit time synced is {}, Getting commits since then", lastCommitTimeSynced.get()); HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE); return timelineToSync.getInstants().map(s -> { diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java index d945b58..77c04b5 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java @@ -31,8 +31,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.schema.DecimalMetadata; import org.apache.parquet.schema.GroupType; @@ -53,7 +53,7 @@ import java.util.stream.Collectors; */ public class SchemaUtil { - private static final Logger LOG = LogManager.getLogger(SchemaUtil.class); + private static final Logger LOG = LoggerFactory.getLogger(SchemaUtil.class); public static final String HIVE_ESCAPE_CHARACTER = "`"; /** @@ -67,7 +67,7 @@ public class SchemaUtil { } catch (IOException e) { throw new HoodieHiveSyncException("Failed to convert parquet schema to hive schema", e); } - LOG.info("Getting schema difference for " + tableSchema + "\r\n\r\n" + newTableSchema); + LOG.info("Getting schema difference for tableSchema :{}, newTableSchema :{}", tableSchema, newTableSchema); SchemaDifference.Builder schemaDiffBuilder = SchemaDifference.newBuilder(storageSchema, tableSchema); Set<String> tableColumns = Sets.newHashSet(); @@ -85,7 +85,7 @@ public class SchemaUtil { continue; } // We will log this and continue. Hive schema is a superset of all parquet schemas - LOG.warn("Ignoring table column " + fieldName + " as its not present in the parquet schema"); + LOG.warn("Ignoring table column {} as its not present in the parquet schema", fieldName); continue; } tableColumnType = tableColumnType.replaceAll("\\s+", ""); @@ -112,7 +112,7 @@ public class SchemaUtil { schemaDiffBuilder.addTableColumn(entry.getKey(), entry.getValue()); } } - LOG.info("Difference between schemas: " + schemaDiffBuilder.build().toString()); + LOG.info("Difference between schemas: {}", schemaDiffBuilder.build().toString()); return schemaDiffBuilder.build(); } diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java b/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java index d82c33b..5118d19 100644 --- a/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java +++ b/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java @@ -35,8 +35,8 @@ import org.apache.hadoop.hive.metastore.TUGIBasedProcessor; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.thrift.TUGIContainingTransport; import org.apache.hive.service.server.HiveServer2; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; @@ -59,7 +59,7 @@ import java.util.concurrent.Executors; public class HiveTestService { - private static final Logger LOG = LogManager.getLogger(HiveTestService.class); + private static final Logger LOG = LoggerFactory.getLogger(HiveTestService.class); private static final int CONNECTION_TIMEOUT = 30000; @@ -95,7 +95,7 @@ public class HiveTestService { String localHiveLocation = getHiveLocation(workDir); if (clean) { - LOG.info("Cleaning Hive cluster data at: " + localHiveLocation + " and starting fresh."); + LOG.info("Cleaning Hive cluster data at: {} and starting fresh.", localHiveLocation); File file = new File(localHiveLocation); FileIOUtils.deleteDirectory(file); } @@ -155,7 +155,7 @@ public class HiveTestService { return true; } catch (MetaException e) { // ignore as this is expected - LOG.info("server " + hostname + ":" + port + " not up " + e); + LOG.error("server {}:{} not up ", hostname, port, e); } if (System.currentTimeMillis() > start + timeout) {