This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch redo-log in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/redo-log by this push: new b98e931 [HUDI-463] Redo hudi-utilities log statements using SLF4J (#1177) b98e931 is described below commit b98e93179b33e4d837ee0fb41b4fa59c6e1c0e76 Author: ForwardXu <forwardxu...@gmail.com> AuthorDate: Mon Jan 6 22:47:01 2020 +0800 [HUDI-463] Redo hudi-utilities log statements using SLF4J (#1177) --- hudi-utilities/pom.xml | 5 +++ .../apache/hudi/utilities/HDFSParquetImporter.java | 15 +++---- .../hudi/utilities/HiveIncrementalPuller.java | 47 +++++++++++----------- .../org/apache/hudi/utilities/HoodieCleaner.java | 8 ++-- .../org/apache/hudi/utilities/HoodieCompactor.java | 8 ++-- .../hudi/utilities/HoodieSnapshotCopier.java | 6 +-- .../org/apache/hudi/utilities/UtilHelpers.java | 8 ++-- .../adhoc/UpgradePayloadFromUberToApache.java | 32 ++++++++------- .../AbstractDeltaStreamerService.java | 7 ++-- .../hudi/utilities/deltastreamer/Compactor.java | 11 ++--- .../hudi/utilities/deltastreamer/DeltaSync.java | 41 ++++++++++--------- .../deltastreamer/HoodieDeltaStreamer.java | 23 ++++++----- .../deltastreamer/SchedulerConfGenerator.java | 8 ++-- .../hudi/utilities/perf/TimelineServerPerf.java | 8 ++-- .../hudi/utilities/sources/AvroKafkaSource.java | 8 ++-- .../hudi/utilities/sources/HiveIncrPullSource.java | 8 ++-- .../hudi/utilities/sources/HoodieIncrSource.java | 8 ++-- .../hudi/utilities/sources/JsonKafkaSource.java | 8 ++-- .../org/apache/hudi/utilities/sources/Source.java | 6 +-- .../utilities/sources/helpers/KafkaOffsetGen.java | 5 --- .../utilities/transform/FlatteningTransformer.java | 8 ++-- .../transform/SqlQueryBasedTransformer.java | 10 ++--- .../hudi/utilities/TestHoodieDeltaStreamer.java | 12 +++--- .../utilities/sources/AbstractBaseTestSource.java | 6 +-- .../sources/DistributedTestDataSource.java | 6 +-- .../hudi/utilities/sources/TestDataSource.java | 6 +-- 26 files changed, 165 insertions(+), 153 deletions(-) diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index ba59e22..00c6ecb 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -124,6 +124,11 @@ <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> <!-- Fasterxml --> <dependency> diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index 4aa72d0..8b141a2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -42,8 +42,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.spark.api.java.JavaRDD; @@ -59,6 +57,8 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -66,7 +66,7 @@ import scala.Tuple2; */ public class HDFSParquetImporter implements Serializable { - private static final Logger LOG = LogManager.getLogger(HDFSParquetImporter.class); + private static final Logger LOG = LoggerFactory.getLogger(HDFSParquetImporter.class); private static final DateTimeFormatter PARTITION_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd") .withZone(ZoneId.systemDefault()); @@ -103,7 +103,7 @@ public class HDFSParquetImporter implements Serializable { this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration()); this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); - LOG.info("Starting data import with configs : " + props.toString()); + LOG.info("Starting data import with configs : {}", props.toString()); int ret = -1; try { // Verify that targetPath is not present. @@ -114,7 +114,7 @@ public class HDFSParquetImporter implements Serializable { ret = dataImport(jsc); } while (ret != 0 && retry-- > 0); } catch (Throwable t) { - LOG.error(t); + LOG.error("The dataImport error:", t); } return ret; } @@ -175,13 +175,14 @@ public class HDFSParquetImporter implements Serializable { throw new HoodieIOException("row field is missing. :" + cfg.rowKey); } String partitionPath = partitionField.toString(); - LOG.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")"); + LOG.debug("Row Key : {}, Partition Path is ({})", rowField, partitionPath); if (partitionField instanceof Number) { try { long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L); partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts)); } catch (NumberFormatException nfe) { - LOG.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")"); + LOG.warn("Unable to parse date from partition field. Assuming partition as ({})", + partitionField); } } return new HoodieRecord<>(new HoodieKey(rowField.toString(), partitionPath), diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java index 963bc7d..0a2ecde 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java @@ -33,8 +33,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.stringtemplate.v4.ST; import java.io.File; @@ -49,6 +47,8 @@ import java.sql.Statement; import java.util.List; import java.util.Scanner; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Utility to pull data after a given commit, based on the supplied HiveQL and save the delta as another hive temporary @@ -61,7 +61,7 @@ import java.util.stream.Collectors; */ public class HiveIncrementalPuller { - private static final Logger LOG = LogManager.getLogger(HiveIncrementalPuller.class); + private static final Logger LOG = LoggerFactory.getLogger(HiveIncrementalPuller.class); private static String driverName = "org.apache.hive.jdbc.HiveDriver"; public static class Config implements Serializable { @@ -129,10 +129,10 @@ public class HiveIncrementalPuller { try { if (config.fromCommitTime == null) { config.fromCommitTime = inferCommitTime(fs); - LOG.info("FromCommitTime inferred as " + config.fromCommitTime); + LOG.info("FromCommitTime inferred as {}", config.fromCommitTime); } - LOG.info("FromCommitTime - " + config.fromCommitTime); + LOG.info("FromCommitTime - {}", config.fromCommitTime); String sourceTableLocation = getTableLocation(config.sourceDb, config.sourceTable); String lastCommitTime = getLastCommitTimePulled(fs, sourceTableLocation); if (lastCommitTime == null) { @@ -180,15 +180,16 @@ public class HiveIncrementalPuller { incrementalPullSQLtemplate.add("storedAsClause", storedAsClause); String incrementalSQL = new Scanner(new File(config.incrementalSQLFile)).useDelimiter("\\Z").next(); if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) { - LOG.info("Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable - + ", which means its pulling from a different table. Fencing this from happening."); + LOG.info( + "Incremental SQL does not have {}.{}, which means its pulling from a different table. Fencing this from happening.", + config.sourceDb, config.sourceTable); throw new HoodieIncrementalPullSQLException( "Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable); } if (!incrementalSQL.contains("`_hoodie_commit_time` > '%targetBasePath'")) { - LOG.info("Incremental SQL : " + incrementalSQL - + " does not contain `_hoodie_commit_time` > '%targetBasePath'. Please add " - + "this clause for incremental to work properly."); + LOG.info( + "Incremental SQL : {} does not contain `_hoodie_commit_time` > '%targetBasePath'. Please add " + + "this clause for incremental to work properly.", incrementalSQL); throw new HoodieIncrementalPullSQLException( "Incremental SQL does not have clause `_hoodie_commit_time` > '%targetBasePath', which " + "means its not pulling incrementally"); @@ -224,18 +225,18 @@ public class HiveIncrementalPuller { } private boolean deleteHDFSPath(FileSystem fs, String path) throws IOException { - LOG.info("Deleting path " + path); + LOG.info("Deleting path {}", path); return fs.delete(new Path(path), true); } private void executeStatement(String sql, Statement stmt) throws SQLException { - LOG.info("Executing: " + sql); + LOG.info("Executing: {}", sql); stmt.execute(sql); } private String inferCommitTime(FileSystem fs) throws SQLException, IOException { - LOG.info("FromCommitTime not specified. Trying to infer it from Hoodie dataset " + config.targetDb + "." - + config.targetTable); + LOG.info("FromCommitTime not specified. Trying to infer it from Hoodie dataset {}.{}", + config.targetDb, config.targetTable); String targetDataLocation = getTableLocation(config.targetDb, config.targetTable); return scanForCommitTime(fs, targetDataLocation); } @@ -249,7 +250,7 @@ public class HiveIncrementalPuller { resultSet = stmt.executeQuery("describe formatted `" + db + "." + table + "`"); while (resultSet.next()) { if (resultSet.getString(1).trim().equals("Location:")) { - LOG.info("Inferred table location for " + db + "." + table + " as " + resultSet.getString(2)); + LOG.info("Inferred table location for {}.{} as {}", db, table, resultSet.getString(2)); return resultSet.getString(2); } } @@ -290,7 +291,7 @@ public class HiveIncrementalPuller { private boolean ensureTempPathExists(FileSystem fs, String lastCommitTime) throws IOException { Path targetBaseDirPath = new Path(config.hoodieTmpDir, config.targetTable + "__" + config.sourceTable); if (!fs.exists(targetBaseDirPath)) { - LOG.info("Creating " + targetBaseDirPath + " with permission drwxrwxrwx"); + LOG.info("Creating {} with permission drwxrwxrwx", targetBaseDirPath); boolean result = FileSystem.mkdirs(fs, targetBaseDirPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); if (!result) { @@ -305,7 +306,7 @@ public class HiveIncrementalPuller { throw new HoodieException("Could not delete existing " + targetPath); } } - LOG.info("Creating " + targetPath + " with permission drwxrwxrwx"); + LOG.info("Creating {} with permission drwxrwxrwx", targetPath); return FileSystem.mkdirs(fs, targetBaseDirPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); } @@ -316,19 +317,19 @@ public class HiveIncrementalPuller { .collect(Collectors.toList()); if (commitsToSync.isEmpty()) { LOG.warn( - "Nothing to sync. All commits in " - + config.sourceTable + " are " + metadata.getActiveTimeline().getCommitsTimeline() - .filterCompletedInstants().getInstants().collect(Collectors.toList()) - + " and from commit time is " + config.fromCommitTime); + "Nothing to sync. All commits in {} are {} and from commit time is {}", + config.sourceTable, metadata.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().getInstants().collect(Collectors.toList()), + config.fromCommitTime, config.sourceTable); return null; } - LOG.info("Syncing commits " + commitsToSync); + LOG.info("Syncing commits {}", commitsToSync); return commitsToSync.get(commitsToSync.size() - 1); } private Connection getConnection() throws SQLException { if (connection == null) { - LOG.info("Getting Hive Connection to " + config.hiveJDBCUrl); + LOG.info("Getting Hive Connection to {}", config.hiveJDBCUrl); this.connection = DriverManager.getConnection(config.hiveJDBCUrl, config.hiveUsername, config.hivePassword); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java index 9185d97..d21e4c6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java @@ -27,18 +27,18 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HoodieCleaner { - private static final Logger LOG = LogManager.getLogger(HoodieCleaner.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieCleaner.class); /** * Config for Cleaner. @@ -66,7 +66,7 @@ public class HoodieCleaner { this.fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration()); this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); - LOG.info("Creating Cleaner with configs : " + props.toString()); + LOG.info("Creating Cleaner with configs : {}", props.toString()); } public void run() throws Exception { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index 4ace07c..31a6ab4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -28,18 +28,18 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HoodieCompactor { - private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieCompactor.class); private final Config cfg; private transient FileSystem fs; private TypedProperties props; @@ -110,7 +110,7 @@ public class HoodieCompactor { } } while (ret != 0 && retry-- > 0); } catch (Throwable t) { - LOG.error(t); + LOG.error("The compact error:", t); } return ret; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java index d24319e..2d64029 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java @@ -36,8 +36,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -47,6 +45,8 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -54,7 +54,7 @@ import scala.Tuple2; */ public class HoodieSnapshotCopier implements Serializable { - private static final Logger LOG = LogManager.getLogger(HoodieSnapshotCopier.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieSnapshotCopier.class); static class Config implements Serializable { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 4cb56e9..65b8e8f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -37,8 +37,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -53,12 +51,14 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Bunch of helper methods. */ public class UtilHelpers { - private static final Logger LOG = LogManager.getLogger(UtilHelpers.class); + private static final Logger LOG = LoggerFactory.getLogger(UtilHelpers.class); public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc, SparkSession sparkSession, SchemaProvider schemaProvider) throws IOException { @@ -97,7 +97,7 @@ public class UtilHelpers { conf = new DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath); } catch (Exception e) { conf = new DFSPropertiesConfiguration(); - LOG.warn("Unexpected error read props file at :" + cfgPath, e); + LOG.warn("Unexpected error read props file at :{}", cfgPath, e); } try { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java index 6040437..8a6f75a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java @@ -28,8 +28,6 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import java.io.BufferedReader; import java.io.FileReader; @@ -38,15 +36,18 @@ import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * This is an one-time use class meant for migrating the configuration for "hoodie.compaction.payload.class" in - * .hoodie/hoodie.properties from com.uber.hoodie to org.apache.hudi . It takes in a file containing base-paths for a set - * of hudi datasets and does the migration + * This is an one-time use class meant for migrating the configuration for + * "hoodie.compaction.payload.class" in .hoodie/hoodie.properties from com.uber.hoodie to + * org.apache.hudi . It takes in a file containing base-paths for a set of hudi datasets and does + * the migration. */ public class UpgradePayloadFromUberToApache implements Serializable { - private static final Logger LOG = LogManager.getLogger(UpgradePayloadFromUberToApache.class); + private static final Logger LOG = LoggerFactory.getLogger(UpgradePayloadFromUberToApache.class); private final Config cfg; @@ -59,36 +60,37 @@ public class UpgradePayloadFromUberToApache implements Serializable { try (BufferedReader reader = new BufferedReader(new FileReader(cfg.inputPath))) { basePath = reader.readLine(); } catch (IOException e) { - LOG.error("Read from path: " + cfg.inputPath + " error.", e); + LOG.error("Read from path: {} error.", cfg.inputPath, e); } while (basePath != null) { basePath = basePath.trim(); if (!basePath.startsWith("#")) { - LOG.info("Performing upgrade for " + basePath); + LOG.info("Performing upgrade for {}", basePath); String metaPath = String.format("%s/.hoodie", basePath); HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(FSUtils.prepareHadoopConf(new Configuration()), basePath, false); + new HoodieTableMetaClient(FSUtils.prepareHadoopConf(new Configuration()), basePath, + false); HoodieTableConfig tableConfig = metaClient.getTableConfig(); if (tableConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ)) { Map<String, String> propsMap = tableConfig.getProps(); if (propsMap.containsKey(HoodieCompactionConfig.PAYLOAD_CLASS_PROP)) { String payloadClass = propsMap.get(HoodieCompactionConfig.PAYLOAD_CLASS_PROP); - LOG.info("Found payload class=" + payloadClass); + LOG.info("Found payload class={}", payloadClass); if (payloadClass.startsWith("com.uber.hoodie")) { String newPayloadClass = payloadClass.replace("com.uber.hoodie", "org.apache.hudi"); - LOG.info("Replacing payload class (" + payloadClass + ") with (" + newPayloadClass + ")"); + LOG.info("Replacing payload class ({}) with ({})", payloadClass, newPayloadClass); Map<String, String> newPropsMap = new HashMap<>(propsMap); newPropsMap.put(HoodieCompactionConfig.PAYLOAD_CLASS_PROP, newPayloadClass); Properties props = new Properties(); props.putAll(newPropsMap); - HoodieTableConfig.createHoodieProperties(metaClient.getFs(), new Path(metaPath), props); - LOG.info("Finished upgrade for " + basePath); + HoodieTableConfig + .createHoodieProperties(metaClient.getFs(), new Path(metaPath), props); + LOG.info("Finished upgrade for {}", basePath); } } } else { - LOG.info("Skipping as this table is COW table. BasePath=" + basePath); - + LOG.info("Skipping as this table is COW table. BasePath={}", basePath); } } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java index 5d36e8d..8fd62e7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java @@ -20,9 +20,6 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.common.util.collection.Pair; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - import java.io.Serializable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -30,13 +27,15 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Base Class for running delta-sync/compaction in separate thread and controlling their life-cyle. */ public abstract class AbstractDeltaStreamerService implements Serializable { - private static final Logger LOG = LogManager.getLogger(AbstractDeltaStreamerService.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractDeltaStreamerService.class); // Flag to track if the service is started. private boolean started; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java index eb3212f..65bf598 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java @@ -24,20 +24,20 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; import java.io.Serializable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Run one round of compaction. */ public class Compactor implements Serializable { - private static final Logger LOG = LogManager.getLogger(Compactor.class); + private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); private transient HoodieWriteClient compactionClient; private transient JavaSparkContext jssc; @@ -48,12 +48,13 @@ public class Compactor implements Serializable { } public void compact(HoodieInstant instant) throws IOException { - LOG.info("Compactor executing compaction " + instant); + LOG.info("Compactor executing compaction {}", instant); JavaRDD<WriteStatus> res = compactionClient.compact(instant.getTimestamp()); long numWriteErrors = res.collect().stream().filter(r -> r.hasErrors()).count(); if (numWriteErrors != 0) { // We treat even a single error in compaction as fatal - LOG.error("Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors); + LOG.error("Compaction for instant ({}) failed with write errors. Errors :{}", instant, + numWriteErrors); throw new HoodieException( "Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 7dfb015..20608b7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -56,8 +56,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -73,6 +71,8 @@ import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.collection.JavaConversions; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; @@ -83,7 +83,7 @@ import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC */ public class DeltaSync implements Serializable { - private static final Logger LOG = LogManager.getLogger(DeltaSync.class); + private static final Logger LOG = LoggerFactory.getLogger(DeltaSync.class); public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; public static String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key"; @@ -168,7 +168,7 @@ public class DeltaSync implements Serializable { this.tableType = tableType; this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient; this.props = props; - LOG.info("Creating delta streamer with configs : " + props.toString()); + LOG.info("Creating delta streamer with configs : {}", props.toString()); this.schemaProvider = schemaProvider; refreshTimeline(); @@ -266,7 +266,7 @@ public class DeltaSync implements Serializable { if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) { resumeCheckpointStr = Option.of(cfg.checkpoint); } - LOG.info("Checkpoint to resume from : " + resumeCheckpointStr); + LOG.info("Checkpoint to resume from : {}", resumeCheckpointStr); final Option<JavaRDD<GenericRecord>> avroRDDOptional; final String checkpointStr; @@ -300,8 +300,9 @@ public class DeltaSync implements Serializable { } if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) { - LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(" - + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")"); + LOG.info( + "No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=({}). New Checkpoint=({})", + resumeCheckpointStr, checkpointStr); return null; } @@ -342,7 +343,7 @@ public class DeltaSync implements Serializable { boolean isEmpty = records.isEmpty(); String commitTime = startCommit(); - LOG.info("Starting commit : " + commitTime); + LOG.info("Starting commit : {}", commitTime); JavaRDD<WriteStatus> writeStatusRDD; if (cfg.operation == Operation.INSERT) { @@ -367,13 +368,14 @@ public class DeltaSync implements Serializable { } if (hasErrors) { - LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" - + totalErrorRecords + "/" + totalRecords); + LOG.warn( + "Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total={}/{}", + totalErrorRecords, totalRecords); } boolean success = writeClient.commit(commitTime, writeStatusRDD, Option.of(checkpointCommitMetadata)); if (success) { - LOG.info("Commit " + commitTime + " successful!"); + LOG.info("Commit {} successful!", commitTime); // Schedule compaction if needed if (cfg.isAsyncCompactionEnabled()) { @@ -387,16 +389,18 @@ public class DeltaSync implements Serializable { hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0; } } else { - LOG.info("Commit " + commitTime + " failed!"); + LOG.info("Commit {} failed!", commitTime); throw new HoodieException("Commit " + commitTime + " failed!"); } } else { - LOG.error("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords); + LOG.error("Delta Sync found errors when writing. Errors/Total={}/{}", totalErrorRecords, + totalRecords); LOG.error("Printing out the top 100 errors"); writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> { - LOG.error("Global error :", ws.getGlobalError()); + LOG.error("Global error :{}", ws.getGlobalError()); if (ws.getErrors().size() > 0) { - ws.getErrors().entrySet().forEach(r -> LOG.trace("Error for key:" + r.getKey() + " is " + r.getValue())); + ws.getErrors().entrySet() + .forEach(r -> LOG.trace("Error for key:{} is {}", r.getKey(), r.getValue())); } }); // Rolling back instant @@ -438,8 +442,9 @@ public class DeltaSync implements Serializable { private void syncHive() throws ClassNotFoundException { if (cfg.enableHiveSync) { HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath); - LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :" - + hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath); + LOG.info( + "Syncing target hoodie table with hive table({}). Hive metastore URL :{}, basePath :{}", + hiveSyncConfig.tableName, hiveSyncConfig.jdbcUrl, cfg.targetBasePath); new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable(); } @@ -503,7 +508,7 @@ public class DeltaSync implements Serializable { schemas.add(schemaProvider.getTargetSchema()); } - LOG.info("Registering Schema :" + schemas); + LOG.info("Registering Schema :{}", schemas); jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList()); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index f8ddadb..3736e87 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -46,8 +46,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; @@ -66,6 +64,8 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target @@ -78,7 +78,7 @@ import java.util.stream.IntStream; */ public class HoodieDeltaStreamer implements Serializable { - private static final Logger LOG = LogManager.getLogger(HoodieDeltaStreamer.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieDeltaStreamer.class); public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; @@ -131,7 +131,7 @@ public class HoodieDeltaStreamer implements Serializable { } private boolean onDeltaSyncShutdown(boolean error) { - LOG.info("DeltaSync shutdown. Closing write client. Error?" + error); + LOG.info("DeltaSync shutdown. Closing write client. Error?{}", error); deltaSyncService.close(); return true; } @@ -363,7 +363,7 @@ public class HoodieDeltaStreamer implements Serializable { } this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); - LOG.info("Creating delta streamer with configs : " + props.toString()); + LOG.info("Creating delta streamer with configs : {}", props.toString()); this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc); if (cfg.filterDupes) { @@ -385,7 +385,7 @@ public class HoodieDeltaStreamer implements Serializable { boolean error = false; if (cfg.isAsyncCompactionEnabled()) { // set Scheduler Pool. - LOG.info("Setting Spark Pool name for delta-sync to " + SchedulerConfGenerator.DELTASYNC_POOL_NAME); + LOG.info("Setting Spark Pool name for delta-sync to {}", SchedulerConfGenerator.DELTASYNC_POOL_NAME); jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.DELTASYNC_POOL_NAME); } try { @@ -394,15 +394,15 @@ public class HoodieDeltaStreamer implements Serializable { long start = System.currentTimeMillis(); Option<String> scheduledCompactionInstant = deltaSync.syncOnce(); if (scheduledCompactionInstant.isPresent()) { - LOG.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstant + ")"); + LOG.info("Enqueuing new pending compaction instant ({})", scheduledCompactionInstant); asyncCompactService.enqueuePendingCompaction(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstant.get())); asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); } long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start); if (toSleepMs > 0) { - LOG.info("Last sync ran less than min sync interval: " + cfg.minSyncIntervalSeconds + " s, sleep: " - + toSleepMs + " ms."); + LOG.info("Last sync ran less than min sync interval: {} s, sleep: {} ms", + cfg.minSyncIntervalSeconds, toSleepMs); Thread.sleep(toSleepMs); } } catch (Exception e) { @@ -422,7 +422,7 @@ public class HoodieDeltaStreamer implements Serializable { * Shutdown compactor as DeltaSync is shutdown. */ private void shutdownCompactor(boolean error) { - LOG.info("Delta Sync shutdown. Error ?" + error); + LOG.info("Delta Sync shutdown. Error ?{}", error); if (asyncCompactService != null) { LOG.warn("Gracefully shutting down compactor"); asyncCompactService.shutdown(false); @@ -561,7 +561,8 @@ public class HoodieDeltaStreamer implements Serializable { IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> { try { // Set Compactor Pool Name for allowing users to prioritize compaction - LOG.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME); + LOG.info("Setting Spark Pool name for compaction to {}", + SchedulerConfGenerator.COMPACT_POOL_NAME); jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME); while (!isShutdownRequested()) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java index 09c4da0..862d59d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java @@ -21,8 +21,6 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.util.Option; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import java.io.BufferedWriter; @@ -32,6 +30,8 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Utility Class to generate Spark Scheduling allocation file. This kicks in only when user sets @@ -39,7 +39,7 @@ import java.util.UUID; */ public class SchedulerConfGenerator { - private static final Logger LOG = LogManager.getLogger(SchedulerConfGenerator.class); + private static final Logger LOG = LoggerFactory.getLogger(SchedulerConfGenerator.class); public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync"; public static final String COMPACT_POOL_NAME = "hoodiecompact"; @@ -88,7 +88,7 @@ public class SchedulerConfGenerator { BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile)); bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare)); bw.close(); - LOG.info("Configs written to file" + tempConfigFile.getAbsolutePath()); + LOG.info("Configs written to file {}", tempConfigFile.getAbsolutePath()); return tempConfigFile.getAbsolutePath(); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java index 1108f65..f8b6a6a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java @@ -37,8 +37,6 @@ import com.codahale.metrics.UniformReservoir; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -55,10 +53,12 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TimelineServerPerf implements Serializable { - private static final Logger LOG = LogManager.getLogger(TimelineServerPerf.class); + private static final Logger LOG = LoggerFactory.getLogger(TimelineServerPerf.class); private final Config cfg; private transient TimelineService timelineServer; private final boolean useExternalTimelineServer; @@ -73,7 +73,7 @@ public class TimelineServerPerf implements Serializable { private void setHostAddrFromSparkConf(SparkConf sparkConf) { String hostAddr = sparkConf.get("spark.driver.host", null); if (hostAddr != null) { - LOG.info("Overriding hostIp to (" + hostAddr + ") found in spark-conf. It was " + this.hostAddr); + LOG.info("Overriding hostIp to ({}) found in spark-conf. It was {}", hostAddr, this.hostAddr); this.hostAddr = hostAddr; } else { LOG.warn("Unable to find driver bind address from spark config"); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index 18ebff4..9a59333 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -27,20 +27,20 @@ import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; import io.confluent.kafka.serializers.KafkaAvroDecoder; import kafka.serializer.StringDecoder; import org.apache.avro.generic.GenericRecord; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.kafka.OffsetRange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Reads avro serialized Kafka data, based on the confluent schema-registry. */ public class AvroKafkaSource extends AvroSource { - private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class); + private static final Logger LOG = LoggerFactory.getLogger(AvroKafkaSource.class); private final KafkaOffsetGen offsetGen; @@ -57,7 +57,7 @@ public class AvroKafkaSource extends AvroSource { if (totalNewMsgs <= 0) { return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : ""); } else { - LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); + LOG.info("About to read {} from Kafka for topic :{}", totalNewMsgs, offsetGen.getTopicName()); } JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges); return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java index 666c260..5e546f3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java @@ -33,8 +33,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -46,6 +44,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Source to read deltas produced by {@link HiveIncrementalPuller}, commit by commit and apply to the target table @@ -59,7 +59,7 @@ import java.util.stream.Collectors; */ public class HiveIncrPullSource extends AvroSource { - private static final Logger LOG = LogManager.getLogger(HiveIncrPullSource.class); + private static final Logger LOG = LoggerFactory.getLogger(HiveIncrPullSource.class); private final transient FileSystem fs; @@ -95,7 +95,7 @@ public class HiveIncrPullSource extends AvroSource { commitTimes.add(splits[splits.length - 1]); } Collections.sort(commitTimes); - LOG.info("Retrieved commit times " + commitTimes); + LOG.info("Retrieved commit times {}", commitTimes); if (!latestTargetCommit.isPresent()) { // start from the beginning diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 888eec7..532104d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -28,8 +28,6 @@ import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.Dataset; @@ -37,10 +35,12 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HoodieIncrSource extends RowSource { - private static final Logger LOG = LogManager.getLogger(HoodieIncrSource.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieIncrSource.class); protected static class Config { @@ -109,7 +109,7 @@ public class HoodieIncrSource extends RowSource { numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt); if (instantEndpts.getKey().equals(instantEndpts.getValue())) { - LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey()); + LOG.warn("Already caught up. Begin Checkpoint was :{}", instantEndpts.getKey()); return Pair.of(Option.empty(), instantEndpts.getKey()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index bd922ac..a146264 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -25,20 +25,20 @@ import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; import kafka.serializer.StringDecoder; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.kafka.OffsetRange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Read json kafka data. */ public class JsonKafkaSource extends JsonSource { - private static final Logger LOG = LogManager.getLogger(JsonKafkaSource.class); + private static final Logger LOG = LoggerFactory.getLogger(JsonKafkaSource.class); private final KafkaOffsetGen offsetGen; @@ -55,7 +55,7 @@ public class JsonKafkaSource extends JsonSource { if (totalNewMsgs <= 0) { return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : ""); } - LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); + LOG.info("About to read {} from Kafka for topic :{}", totalNewMsgs, offsetGen.getTopicName()); JavaRDD<String> newDataRDD = toRDD(offsetRanges); return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges)); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java index 0760c73..d9d3299 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java @@ -22,18 +22,18 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.utilities.schema.SchemaProvider; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import java.io.Serializable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Represents a source from which we can tail data. Assumes a constructor that takes properties. */ public abstract class Source<T> implements Serializable { - private static final Logger LOG = LogManager.getLogger(Source.class); + private static final Logger LOG = LoggerFactory.getLogger(Source.class); public enum SourceType { JSON, AVRO, ROW diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index c17a5cf..3810e94 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -25,8 +25,6 @@ import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; import kafka.common.TopicAndPartition; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.streaming.kafka.KafkaCluster; import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset; import org.apache.spark.streaming.kafka.OffsetRange; @@ -50,9 +48,6 @@ import scala.util.Either; * Source to read data from Kafka, incrementally. */ public class KafkaOffsetGen { - - private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class); - public static class CheckpointUtils { /** diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java index aabcb73..9f530e2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java @@ -20,8 +20,6 @@ package org.apache.hudi.utilities.transform; import org.apache.hudi.common.util.TypedProperties; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -30,6 +28,8 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Transformer that can flatten nested objects. It currently doesn't unnest arrays. @@ -37,7 +37,7 @@ import java.util.UUID; public class FlatteningTransformer implements Transformer { private static final String TMP_TABLE = "HUDI_SRC_TMP_TABLE_"; - private static final Logger LOG = LogManager.getLogger(SqlQueryBasedTransformer.class); + private static final Logger LOG = LoggerFactory.getLogger(SqlQueryBasedTransformer.class); /** * Configs supported. @@ -48,7 +48,7 @@ public class FlatteningTransformer implements Transformer { // tmp table name doesn't like dashes String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_")); - LOG.info("Registering tmp table : " + tmpTable); + LOG.info("Registering tmp table : {}", tmpTable); rowDataset.registerTempTable(tmpTable); return sparkSession.sql("select " + flattenSchema(rowDataset.schema(), null) + " from " + tmpTable); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java index 8210fb1..1db5fd0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java @@ -20,14 +20,14 @@ package org.apache.hudi.utilities.transform; import org.apache.hudi.common.util.TypedProperties; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A transformer that allows a sql-query template be used to transform the source before writing to Hudi data-set. @@ -36,7 +36,7 @@ import java.util.UUID; */ public class SqlQueryBasedTransformer implements Transformer { - private static final Logger LOG = LogManager.getLogger(SqlQueryBasedTransformer.class); + private static final Logger LOG = LoggerFactory.getLogger(SqlQueryBasedTransformer.class); private static final String SRC_PATTERN = "<SRC>"; private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_"; @@ -59,10 +59,10 @@ public class SqlQueryBasedTransformer implements Transformer { // tmp table name doesn't like dashes String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_")); - LOG.info("Registering tmp table : " + tmpTable); + LOG.info("Registering tmp table : {}", tmpTable); rowDataset.registerTempTable(tmpTable); String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable); - LOG.info("SQL Query for transformation : (" + sqlStr + ")"); + LOG.info("SQL Query for transformation : ({})", sqlStr); return sparkSession.sql(sqlStr); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index c5f6c76..a42b5d6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -50,8 +50,6 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -77,6 +75,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -89,7 +89,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties"; private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; - private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); + private static final Logger LOG = LoggerFactory.getLogger(TestHoodieDeltaStreamer.class); @BeforeClass public static void initClass() throws Exception { @@ -247,7 +247,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { static void assertAtleastNCompactionCommits(int minExpected, String datasetPath, FileSystem fs) { HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath); HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); - LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); + LOG.info("Timeline Instants={}", + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); int numCompactionCommits = (int) timeline.getInstants().count(); assertTrue("Got=" + numCompactionCommits + ", exp >=" + minExpected, minExpected <= numCompactionCommits); } @@ -255,7 +256,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { static void assertAtleastNDeltaCommits(int minExpected, String datasetPath, FileSystem fs) { HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath); HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants(); - LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); + LOG.info("Timeline Instants={}", + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); int numDeltaCommits = (int) timeline.getInstants().count(); assertTrue("Got=" + numDeltaCommits + ", exp >=" + minExpected, minExpected <= numDeltaCommits); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java index 745b0f0..941ddeb 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java @@ -29,8 +29,6 @@ import org.apache.hudi.utilities.sources.config.TestSourceConfig; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; @@ -39,10 +37,12 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AbstractBaseTestSource extends AvroSource { - private static final Logger LOG = LogManager.getLogger(AbstractBaseTestSource.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractBaseTestSource.class); static final int DEFAULT_PARTITION_NUM = 0; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DistributedTestDataSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DistributedTestDataSource.java index 7153b2e..3c43aa3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DistributedTestDataSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DistributedTestDataSource.java @@ -24,21 +24,21 @@ import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.config.TestSourceConfig; import org.apache.avro.generic.GenericRecord; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A Test DataSource which scales test-data generation by using spark parallelism. */ public class DistributedTestDataSource extends AbstractBaseTestSource { - private static final Logger LOG = LogManager.getLogger(DistributedTestDataSource.class); + private static final Logger LOG = LoggerFactory.getLogger(DistributedTestDataSource.class); private final int numTestSourcePartitions; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java index 0b52db9..ae09474 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java @@ -23,21 +23,21 @@ import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.avro.generic.GenericRecord; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import java.util.List; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An implementation of {@link Source}, that emits test upserts. */ public class TestDataSource extends AbstractBaseTestSource { - private static final Logger LOG = LogManager.getLogger(TestDataSource.class); + private static final Logger LOG = LoggerFactory.getLogger(TestDataSource.class); public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {