This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch redo-log in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/redo-log by this push: new 8f8cd42 [HUDI-458] Redo hudi-hadoop-mr log statements using SLF4J (#1210) 8f8cd42 is described below commit 8f8cd42e9792e0a9648b68f933929ad87cd503c5 Author: Mathieu <49835526+wangxian...@users.noreply.github.com> AuthorDate: Mon Jan 13 11:18:09 2020 +0800 [HUDI-458] Redo hudi-hadoop-mr log statements using SLF4J (#1210) --- hudi-hadoop-mr/pom.xml | 7 ++++++ .../org/apache/hudi/hadoop/HoodieHiveUtil.java | 12 +++++----- .../hudi/hadoop/HoodieParquetInputFormat.java | 26 ++++++++++---------- .../hudi/hadoop/HoodieROTablePathFilter.java | 28 +++++++++++----------- .../hudi/hadoop/RecordReaderValueIterator.java | 6 ++--- .../hadoop/hive/HoodieCombineHiveInputFormat.java | 27 ++++++++++----------- .../realtime/AbstractRealtimeRecordReader.java | 24 +++++++++---------- .../realtime/HoodieParquetRealtimeInputFormat.java | 22 ++++++++--------- .../realtime/HoodieRealtimeRecordReader.java | 6 ++--- .../realtime/RealtimeCompactedRecordReader.java | 14 +++++------ 10 files changed, 89 insertions(+), 83 deletions(-) diff --git a/hudi-hadoop-mr/pom.xml b/hudi-hadoop-mr/pom.xml index 6bc3c8e..2a222f3 100644 --- a/hudi-hadoop-mr/pom.xml +++ b/hudi-hadoop-mr/pom.xml @@ -81,6 +81,13 @@ <artifactId>hive-exec</artifactId> </dependency> + <!-- Logging --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + <!-- Hoodie - Test --> <dependency> <groupId>org.apache.hudi</groupId> diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java index 1db8c54..f371719 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java @@ -20,12 +20,12 @@ package org.apache.hudi.hadoop; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HoodieHiveUtil { - public static final Logger LOG = LogManager.getLogger(HoodieHiveUtil.class); + public static final Logger LOG = LoggerFactory.getLogger(HoodieHiveUtil.class); public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode"; public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp"; @@ -43,20 +43,20 @@ public class HoodieHiveUtil { if (maxCommits == MAX_COMMIT_ALL) { maxCommits = Integer.MAX_VALUE; } - LOG.info("Read max commits - " + maxCommits); + LOG.info("Read max commits - {}", maxCommits); return maxCommits; } public static String readStartCommitTime(JobContext job, String tableName) { String startCommitTimestampName = String.format(HOODIE_START_COMMIT_PATTERN, tableName); - LOG.info("Read start commit time - " + job.getConfiguration().get(startCommitTimestampName)); + LOG.info("Read start commit time - {}", job.getConfiguration().get(startCommitTimestampName)); return job.getConfiguration().get(startCommitTimestampName); } public static String readMode(JobContext job, String tableName) { String modePropertyName = String.format(HOODIE_CONSUME_MODE_PATTERN, tableName); String mode = job.getConfiguration().get(modePropertyName, DEFAULT_SCAN_MODE); - LOG.info(modePropertyName + ": " + mode); + LOG.info("Hoodie consume mode pattern is : {}, mode is : {}", modePropertyName, mode); return mode; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index e8f7de0..ea92f11 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -42,8 +42,8 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -60,7 +60,7 @@ import java.util.stream.Collectors; @UseFileSplitsFromInputFormat public class HoodieParquetInputFormat extends MapredParquetInputFormat implements Configurable { - private static final Logger LOG = LogManager.getLogger(HoodieParquetInputFormat.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieParquetInputFormat.class); protected Configuration conf; @@ -69,7 +69,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement // Get all the file status from FileInputFormat and then do the filter FileStatus[] fileStatuses = super.listStatus(job); Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus = groupFileStatus(fileStatuses); - LOG.info("Found a total of " + groupedFileStatus.size() + " groups"); + LOG.info("Found a total of {} groups", groupedFileStatus.size()); List<FileStatus> returns = new ArrayList<>(); for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) { HoodieTableMetaClient metadata = entry.getKey(); @@ -81,7 +81,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement FileStatus[] statuses = entry.getValue().toArray(new FileStatus[entry.getValue().size()]); if (LOG.isDebugEnabled()) { - LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata); + LOG.debug("Hoodie Metadata initialized with completed commit Ts as : {}", metadata); } String tableName = metadata.getTableConfig().getTableName(); String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName); @@ -95,24 +95,24 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName); // Total number of commits to return in this batch. Set this to -1 to get all the commits. Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), tableName); - LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs); + LOG.info("Last Incremental timestamp was set as {}", lastIncrementalTs); List<String> commitsToReturn = timeline.findInstantsAfter(lastIncrementalTs, maxCommits).getInstants() .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); List<HoodieDataFile> filteredFiles = roView.getLatestDataFilesInRange(commitsToReturn).collect(Collectors.toList()); for (HoodieDataFile filteredFile : filteredFiles) { - LOG.info("Processing incremental hoodie file - " + filteredFile.getPath()); + LOG.info("Processing incremental hoodie file - {}", filteredFile.getPath()); filteredFile = checkFileStatus(filteredFile); returns.add(filteredFile.getFileStatus()); } - LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size()); + LOG.info("Total paths to process after hoodie incremental filter {}", filteredFiles.size()); } else { // filter files on the latest commit found List<HoodieDataFile> filteredFiles = roView.getLatestDataFiles().collect(Collectors.toList()); - LOG.info("Total paths to process after hoodie filter " + filteredFiles.size()); + LOG.info("Total paths to process after hoodie filter {}", filteredFiles.size()); for (HoodieDataFile filteredFile : filteredFiles) { if (LOG.isDebugEnabled()) { - LOG.debug("Processing latest hoodie file - " + filteredFile.getPath()); + LOG.debug("Processing latest hoodie file - {}", filteredFile.getPath()); } filteredFile = checkFileStatus(filteredFile); returns.add(filteredFile.getFileStatus()); @@ -133,7 +133,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement try { if (dataFile.getFileSize() == 0) { FileSystem fs = dataPath.getFileSystem(conf); - LOG.info("Refreshing file status " + dataFile.getPath()); + LOG.info("Refreshing file status {}", dataFile.getPath()); return new HoodieDataFile(fs.getFileStatus(dataPath)); } return dataFile; @@ -160,7 +160,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement metadata = getTableMetaClient(status.getPath().getFileSystem(conf), status.getPath().getParent()); nonHoodieBasePath = null; } catch (DatasetNotFoundException | InvalidDatasetException e) { - LOG.info("Handling a non-hoodie path " + status.getPath()); + LOG.info("Handling a non-hoodie path {}", status.getPath()); metadata = null; nonHoodieBasePath = status.getPath().getParent().toString(); } @@ -213,7 +213,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement levels = metadata.getPartitionDepth(); } Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels); - LOG.info("Reading hoodie metadata from path " + baseDir.toString()); + LOG.info("Reading hoodie metadata from path {}", baseDir); return new HoodieTableMetaClient(fs.getConf(), baseDir.toString()); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index fae8111..4670e4f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -29,8 +29,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.HashMap; @@ -50,7 +50,7 @@ import java.util.stream.Collectors; */ public class HoodieROTablePathFilter implements PathFilter, Serializable { - private static final Logger LOG = LogManager.getLogger(HoodieROTablePathFilter.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieROTablePathFilter.class); /** * Its quite common, to have all files from a given partition path be passed into accept(), cache the check for hoodie @@ -88,7 +88,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable { public boolean accept(Path path) { if (LOG.isDebugEnabled()) { - LOG.debug("Checking acceptance for path " + path); + LOG.debug("Checking acceptance for path {}", path); } Path folder = null; try { @@ -101,15 +101,15 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable { // Try to use the caches. if (nonHoodiePathCache.contains(folder.toString())) { if (LOG.isDebugEnabled()) { - LOG.debug("Accepting non-hoodie path from cache: " + path); + LOG.debug("Accepting non-hoodie path from cache: {}", path); } return true; } if (hoodiePathCache.containsKey(folder.toString())) { if (LOG.isDebugEnabled()) { - LOG.debug(String.format("%s Hoodie path checked against cache, accept => %s \n", path, - hoodiePathCache.get(folder.toString()).contains(path))); + LOG.debug("{} Hoodie path checked against cache, accept => {}", path, + hoodiePathCache.get(folder.toString()).contains(path)); } return hoodiePathCache.get(folder.toString()).contains(path); } @@ -119,7 +119,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable { if (filePath.contains("/" + HoodieTableMetaClient.METAFOLDER_NAME + "/") || filePath.endsWith("/" + HoodieTableMetaClient.METAFOLDER_NAME)) { if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Skipping Hoodie Metadata file %s \n", filePath)); + LOG.debug("Skipping Hoodie Metadata file {}", filePath); } return false; } @@ -144,22 +144,22 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable { if (!hoodiePathCache.containsKey(folder.toString())) { hoodiePathCache.put(folder.toString(), new HashSet<>()); } - LOG.info("Based on hoodie metadata from base path: " + baseDir.toString() + ", caching " + latestFiles.size() - + " files under " + folder); + LOG.info("Based on hoodie metadata from base path: {}, caching {} files under {}", baseDir, + latestFiles.size(), folder); for (HoodieDataFile lfile : latestFiles) { hoodiePathCache.get(folder.toString()).add(new Path(lfile.getPath())); } // accept the path, if its among the latest files. if (LOG.isDebugEnabled()) { - LOG.debug(String.format("%s checked after cache population, accept => %s \n", path, - hoodiePathCache.get(folder.toString()).contains(path))); + LOG.debug("{} checked after cache population, accept => {}", path, + hoodiePathCache.get(folder.toString()).contains(path)); } return hoodiePathCache.get(folder.toString()).contains(path); } catch (DatasetNotFoundException e) { // Non-hoodie path, accept it. if (LOG.isDebugEnabled()) { - LOG.debug(String.format("(1) Caching non-hoodie path under %s \n", folder.toString())); + LOG.debug("(1) Caching non-hoodie path under {}", folder); } nonHoodiePathCache.add(folder.toString()); return true; @@ -167,7 +167,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable { } else { // files is at < 3 level depth in FS tree, can't be hoodie dataset if (LOG.isDebugEnabled()) { - LOG.debug(String.format("(2) Caching non-hoodie path under %s \n", folder.toString())); + LOG.debug("(2) Caching non-hoodie path under {}", folder); } nonHoodiePathCache.add(folder.toString()); return true; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java index 0386186..7ffa3bf 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java @@ -21,8 +21,8 @@ package org.apache.hudi.hadoop; import org.apache.hudi.exception.HoodieException; import org.apache.hadoop.mapred.RecordReader; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Iterator; @@ -36,7 +36,7 @@ import java.util.NoSuchElementException; */ public class RecordReaderValueIterator<K, V> implements Iterator<V> { - private static final Logger LOG = LogManager.getLogger(RecordReaderValueIterator.class); + private static final Logger LOG = LoggerFactory.getLogger(RecordReaderValueIterator.class); private final RecordReader<K, V> reader; private V nextVal = null; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java index cd1cea3..4ed7ba9 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -57,8 +57,8 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapred.lib.CombineFileSplit; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.DataInput; import java.io.DataOutput; @@ -93,7 +93,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend extends HiveInputFormat<K, V> { private static final String CLASS_NAME = HoodieCombineHiveInputFormat.class.getName(); - public static final Logger LOG = LogManager.getLogger(CLASS_NAME); + public static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); // max number of threads we can use to check non-combinable paths private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50; @@ -125,7 +125,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend if (inputFormat instanceof AvoidSplitCombination && ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) { if (LOG.isDebugEnabled()) { - LOG.debug("The path [" + paths[i + start] + "] is being parked for HiveInputFormat.getSplits"); + LOG.debug("The path [{}] is being parked for HiveInputFormat.getSplits", paths[i + start]); } nonCombinablePathIndices.add(i + start); } @@ -388,7 +388,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend Class inputFormatClass = part.getInputFileFormatClass(); String inputFormatClassName = inputFormatClass.getName(); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); - LOG.info("Input Format => " + inputFormatClass.getName()); + LOG.info("Input Format => {}", inputFormatClass.getName()); // **MOD** Set the hoodie filter in the combine if (inputFormatClass.getName().equals(HoodieParquetInputFormat.class.getName())) { combine.setHoodieFilter(true); @@ -428,11 +428,11 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend f = poolMap.get(combinePathInputFormat); if (f == null) { f = new CombineFilter(filterPath); - LOG.info("CombineHiveInputSplit creating pool for " + path + "; using filter path " + filterPath); + LOG.info("CombineHiveInputSplit creating pool for {}; using filter path {}", path, filterPath); combine.createPool(job, f); poolMap.put(combinePathInputFormat, f); } else { - LOG.info("CombineHiveInputSplit: pool is already created for " + path + "; using filter path " + filterPath); + LOG.info("CombineHiveInputSplit: pool is already created for {}; using filter path {}", path, filterPath); f.addPath(filterPath); } } else { @@ -481,7 +481,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend result.add(csplit); } - LOG.info("number of splits " + result.size()); + LOG.info("number of splits {}", result.size()); return result.toArray(new CombineHiveInputSplit[result.size()]); } @@ -491,8 +491,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend @VisibleForTesting public Set<Integer> getNonCombinablePathIndices(JobConf job, Path[] paths, int numThreads) throws ExecutionException, InterruptedException { - LOG.info("Total number of paths: " + paths.length + ", launching " + numThreads - + " threads to check non-combinable ones."); + LOG.info("Total number of paths: {}, launching {} threads to check non-combinable ones.", paths.length, numThreads); int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads); ExecutorService executor = Executors.newFixedThreadPool(numThreads); @@ -555,8 +554,8 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend // Store the previous value for the path specification String oldPaths = job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR); if (LOG.isDebugEnabled()) { - LOG.debug("The received input paths are: [" + oldPaths + "] against the property " - + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR); + LOG.debug("The received input paths are: [{}] against the property {}", oldPaths, + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR); } // Process the normal splits @@ -589,7 +588,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend // clear work from ThreadLocal after splits generated in case of thread is reused in pool. Utilities.clearWorkMapForConf(job); - LOG.info("Number of all splits " + result.size()); + LOG.info("Number of all splits {}", result.size()); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return result.toArray(new InputSplit[result.size()]); } @@ -691,7 +690,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend retLists.add(split); long splitgLength = split.getLength(); if (size + splitgLength >= targetSize) { - LOG.info("Sample alias " + entry.getValue() + " using " + (i + 1) + "splits"); + LOG.info("Sample alias {} using {} splits", entry.getValue(), i + 1); if (size + splitgLength > targetSize) { ((InputSplitShim) split).shrinkSplit(targetSize - size); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index d7b50d4..6c7bab1 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -45,11 +45,11 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -84,7 +84,7 @@ public abstract class AbstractRealtimeRecordReader { // Default file path prefix for spillable file public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/"; - private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractRealtimeRecordReader.class); protected final HoodieRealtimeFileSplit split; protected final JobConf jobConf; @@ -98,12 +98,12 @@ public abstract class AbstractRealtimeRecordReader { public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) { this.split = split; this.jobConf = job; - LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); - LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); - LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "")); + LOG.info("cfg ==> {}", job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); + LOG.info("columnIds ==> {}", job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + LOG.info("partitioningColumns ==> {}", job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "")); try { this.usesCustomPayload = usesCustomPayload(); - LOG.info("usesCustomPayload ==> " + this.usesCustomPayload); + LOG.info("usesCustomPayload ==> {}", this.usesCustomPayload); baseFileSchema = readSchema(jobConf, split.getPath()); init(); } catch (IOException e) { @@ -339,10 +339,10 @@ public abstract class AbstractRealtimeRecordReader { LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaFilePaths(), jobConf); if (schemaFromLogFile == null) { writerSchema = new AvroSchemaConverter().convert(baseFileSchema); - LOG.debug("Writer Schema From Parquet => " + writerSchema.getFields()); + LOG.debug("Writer Schema From Parquet => {}", writerSchema.getFields()); } else { writerSchema = schemaFromLogFile; - LOG.debug("Writer Schema From Log => " + writerSchema.getFields()); + LOG.debug("Writer Schema From Log => {}", writerSchema.getFields()); } // Add partitioning fields to writer schema for resulting row to contain null values for these fields String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); @@ -359,8 +359,8 @@ public abstract class AbstractRealtimeRecordReader { // to null out fields not present before readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields); - LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s", - split.getDeltaFilePaths(), split.getPath(), projectionFields)); + LOG.info("About to read compacted logs {} for base split {}, projecting cols {}", + split.getDeltaFilePaths(), split.getPath(), projectionFields); } private Schema constructHiveOrderedSchema(Schema writerSchema, Map<String, Field> schemaFieldsMap) { @@ -377,7 +377,7 @@ public abstract class AbstractRealtimeRecordReader { } else { // Hive has some extra virtual columns like BLOCK__OFFSET__INSIDE__FILE which do not exist in table schema. // They will get skipped as they won't be found in the original schema. - LOG.debug("Skipping Hive Column => " + columnName); + LOG.debug("Skipping Hive Column => {}", columnName); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index f62f288..0320257 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -46,8 +46,8 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -65,7 +65,7 @@ import java.util.stream.Stream; @UseFileSplitsFromInputFormat public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat implements Configurable { - private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieParquetRealtimeInputFormat.class); // These positions have to be deterministic across all tables public static final int HOODIE_COMMIT_TIME_COL_POS = 0; @@ -148,7 +148,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e); } }); - LOG.info("Returning a total splits of " + rtSplits.size()); + LOG.info("Returning a total splits of {}", rtSplits.size()); return rtSplits.toArray(new InputSplit[rtSplits.size()]); } @@ -180,9 +180,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColNamesPrefix + fieldName); conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIdsPrefix + fieldIndex); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Adding extra column " + fieldName + ", to enable log merging cols (%s) ids (%s) ", + LOG.debug("Adding extra column {}, to enable log merging cols ({}) ids ({}) ", fieldName, conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), - conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR))); + conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); } } return conf; @@ -210,7 +210,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') { conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1)); if (LOG.isDebugEnabled()) { - LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed"); + LOG.debug("The projection Ids: [{}] start with ','. First comma is removed", columnIds); } } return conf; @@ -226,9 +226,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i // actual heavy lifting of reading the parquet files happen. if (job.get(HOODIE_READ_COLUMNS_PROP) == null) { synchronized (job) { - LOG.info( - "Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) - + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + LOG.info("Before adding Hoodie columns, Projections : {}, Ids : {}", + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); if (job.get(HOODIE_READ_COLUMNS_PROP) == null) { // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table; // In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases @@ -244,7 +244,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i } } - LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + LOG.info("Creating record reader with readCols : {}", job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); // sanity check Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit, diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java index cb8606e..bd02971 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -24,8 +24,8 @@ import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -39,7 +39,7 @@ public class HoodieRealtimeRecordReader implements RecordReader<NullWritable, Ar public static final String REALTIME_SKIP_MERGE_PROP = "hoodie.realtime.merge.skip"; // By default, we do merged-reading public static final String DEFAULT_REALTIME_SKIP_MERGE = "false"; - private static final Logger LOG = LogManager.getLogger(HoodieRealtimeRecordReader.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieRealtimeRecordReader.class); private final RecordReader<NullWritable, ArrayWritable> reader; public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job, diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index deeaaf4..5ad59af 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -31,8 +31,8 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; @@ -40,7 +40,7 @@ import java.util.Map; class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader implements RecordReader<NullWritable, ArrayWritable> { - private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractRealtimeRecordReader.class); protected final RecordReader<NullWritable, ArrayWritable> parquetReader; private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap; @@ -108,8 +108,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema()); Writable[] replaceValue = aWritable.get(); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("key %s, base values: %s, log values: %s", key, arrayWritableToString(arrayWritable), - arrayWritableToString(aWritable))); + LOG.debug("key {}, base values: {}, log values: {}", key, arrayWritableToString(arrayWritable), + arrayWritableToString(aWritable)); } Writable[] originalValue = arrayWritable.get(); try { @@ -117,8 +117,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader arrayWritable.set(originalValue); } catch (RuntimeException re) { LOG.error("Got exception when doing array copy", re); - LOG.error("Base record :" + arrayWritableToString(arrayWritable)); - LOG.error("Log record :" + arrayWritableToString(aWritable)); + LOG.error("Base record : {}", arrayWritableToString(arrayWritable)); + LOG.error("Log record : {}", arrayWritableToString(aWritable)); String errMsg = "Base-record :" + arrayWritableToString(arrayWritable) + " ,Log-record :" + arrayWritableToString(aWritable) + " ,Error :" + re.getMessage(); throw new RuntimeException(errMsg, re);