This is an automated email from the ASF dual-hosted git repository. vinoyang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 4e5c867 [HUDI-740]Fix can not specify the sparkMaster and code clean for SparkUtil (#1452) 4e5c867 is described below commit 4e5c8671ef3213ffa5c40f09aae27aacfa20f907 Author: hongdd <jn_...@163.com> AuthorDate: Wed Apr 8 21:33:15 2020 +0800 [HUDI-740]Fix can not specify the sparkMaster and code clean for SparkUtil (#1452) --- .../org/apache/hudi/cli/HoodieCliSparkConfig.java | 46 ++++++++++++++++ .../apache/hudi/cli/commands/CleansCommand.java | 2 +- .../hudi/cli/commands/CompactionCommand.java | 16 +++--- .../org/apache/hudi/cli/commands/SparkMain.java | 62 +++++++++------------- .../java/org/apache/hudi/cli/utils/SparkUtil.java | 36 ++++++++----- 5 files changed, 103 insertions(+), 59 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCliSparkConfig.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCliSparkConfig.java new file mode 100644 index 0000000..0d64135 --- /dev/null +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCliSparkConfig.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli; + +/** + * Class storing configs for init spark. + */ +public class HoodieCliSparkConfig { + /** + * Configs to start spark application. + */ + public static final String CLI_SPARK_MASTER = "SPARK_MASTER"; + public static final String CLI_SERIALIZER = "spark.serializer"; + public static final String CLI_DRIVER_MAX_RESULT_SIZE = "spark.driver.maxResultSize"; + public static final String CLI_EVENT_LOG_OVERWRITE = "spark.eventLog.overwrite"; + public static final String CLI_EVENT_LOG_ENABLED = "spark.eventLog.enabled"; + public static final String CLI_EXECUTOR_MEMORY = "spark.executor.memory"; + + /** + * Hadoop output config. + */ + public static final String CLI_MAPRED_OUTPUT_COMPRESS = "spark.hadoop.mapred.output.compress"; + public static final String CLI_MAPRED_OUTPUT_COMPRESSION_CODEC = "spark.hadoop.mapred.output.compression.codec"; + public static final String CLI_MAPRED_OUTPUT_COMPRESSION_TYPE = "spark.hadoop.mapred.output.compression.type"; + + /** + * Parquet file config. + */ + public static final String CLI_PARQUET_ENABLE_SUMMARY_METADATA = "parquet.enable.summary-metadata"; +} diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java index 34321ef..609e44b 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java @@ -139,7 +139,7 @@ public class CleansCommand implements CommandMarker { SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); String cmd = SparkMain.SparkCommand.CLEAN.toString(); - sparkLauncher.addAppArgs(cmd, metaClient.getBasePath(), master, propsFilePath, sparkMemory); + sparkLauncher.addAppArgs(cmd, master, sparkMemory, metaClient.getBasePath(), propsFilePath); UtilHelpers.validateAndAddProperties(configs, sparkLauncher); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index 0843a87..a4c70da 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -423,8 +423,8 @@ public class CompactionCommand implements CommandMarker { String sparkPropertiesPath = Utils .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(), client.getBasePath(), - compactionInstant, outputPathStr, parallelism, master, sparkMemory); + sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(), master, sparkMemory, client.getBasePath(), + compactionInstant, outputPathStr, parallelism); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); int exitCode = process.waitFor(); @@ -484,8 +484,8 @@ public class CompactionCommand implements CommandMarker { String sparkPropertiesPath = Utils .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), client.getBasePath(), - compactionInstant, outputPathStr, parallelism, master, sparkMemory, Boolean.valueOf(skipV).toString(), + sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), master, sparkMemory, client.getBasePath(), + compactionInstant, outputPathStr, parallelism, Boolean.valueOf(skipV).toString(), Boolean.valueOf(dryRun).toString()); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); @@ -528,8 +528,8 @@ public class CompactionCommand implements CommandMarker { String sparkPropertiesPath = Utils .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), client.getBasePath(), - fileId, outputPathStr, "1", master, sparkMemory, Boolean.valueOf(skipV).toString(), + sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), master, sparkMemory, client.getBasePath(), + fileId, outputPathStr, "1", Boolean.valueOf(skipV).toString(), Boolean.valueOf(dryRun).toString()); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); @@ -574,8 +574,8 @@ public class CompactionCommand implements CommandMarker { String sparkPropertiesPath = Utils .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(), client.getBasePath(), - compactionInstant, outputPathStr, parallelism, master, sparkMemory, Boolean.valueOf(dryRun).toString()); + sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(), master, sparkMemory, client.getBasePath(), + compactionInstant, outputPathStr, parallelism, Boolean.valueOf(dryRun).toString()); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); int exitCode = process.waitFor(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 89e62b9..e8f4c6a 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -22,6 +22,7 @@ import org.apache.hudi.cli.DedupeSparkJob; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -62,7 +63,9 @@ public class SparkMain { SparkCommand cmd = SparkCommand.valueOf(command); - JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + command); + JavaSparkContext jsc = sparkMasterContained(cmd) + ? SparkUtil.initJavaSparkConf("hoodie-cli-" + command, Option.of(args[1]), Option.of(args[2])) + : SparkUtil.initJavaSparkConf("hoodie-cli-" + command); int returnCode = 0; switch (cmd) { case ROLLBACK: @@ -118,38 +121,38 @@ public class SparkMain { break; case COMPACT_VALIDATE: assert (args.length == 7); - doCompactValidate(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6]); + doCompactValidate(jsc, args[3], args[4], args[5], Integer.parseInt(args[6])); returnCode = 0; break; case COMPACT_REPAIR: assert (args.length == 8); - doCompactRepair(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], + doCompactRepair(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), Boolean.parseBoolean(args[7])); returnCode = 0; break; case COMPACT_UNSCHEDULE_FILE: assert (args.length == 9); - doCompactUnscheduleFile(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], + doCompactUnscheduleFile(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); returnCode = 0; break; case COMPACT_UNSCHEDULE_PLAN: assert (args.length == 9); - doCompactUnschedule(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], + doCompactUnschedule(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); returnCode = 0; break; case CLEAN: assert (args.length >= 5); propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[3])) { - propsFilePath = args[3]; + if (!StringUtils.isNullOrEmpty(args[4])) { + propsFilePath = args[4]; } configs = new ArrayList<>(); if (args.length > 5) { configs.addAll(Arrays.asList(args).subList(5, args.length)); } - clean(jsc, args[1], args[2], propsFilePath, args[4], configs); + clean(jsc, args[3], propsFilePath, configs); break; default: break; @@ -157,14 +160,16 @@ public class SparkMain { System.exit(returnCode); } - private static void clean(JavaSparkContext jsc, String basePath, String sparkMaster, String propsFilePath, - String sparkMemory, List<String> configs) throws Exception { + private static boolean sparkMasterContained(SparkCommand command) { + List<SparkCommand> masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR, + SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN); + return masterContained.contains(command); + } + + private static void clean(JavaSparkContext jsc, String basePath, String propsFilePath, + List<String> configs) { HoodieCleaner.Config cfg = new HoodieCleaner.Config(); cfg.basePath = basePath; - if ((null != sparkMaster) && (!sparkMaster.isEmpty())) { - jsc.getConf().setMaster(sparkMaster); - } - jsc.getConf().set("spark.executor.memory", sparkMemory); cfg.propsFilePath = propsFilePath; cfg.configs = configs; new HoodieCleaner(cfg, jsc).run(); @@ -172,7 +177,7 @@ public class SparkMain { private static int dataLoad(JavaSparkContext jsc, String command, String srcPath, String targetPath, String tableName, String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, String sparkMemory, - int retry, String propsFilePath, List<String> configs) { + int retry, String propsFilePath, List<String> configs) { Config cfg = new Config(); cfg.command = command; cfg.srcPath = srcPath; @@ -190,22 +195,18 @@ public class SparkMain { } private static void doCompactValidate(JavaSparkContext jsc, String basePath, String compactionInstant, - String outputPath, int parallelism, String sparkMaster, String sparkMemory) throws Exception { + String outputPath, int parallelism) throws Exception { HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config(); cfg.basePath = basePath; cfg.operation = Operation.VALIDATE; cfg.outputPath = outputPath; cfg.compactionInstantTime = compactionInstant; cfg.parallelism = parallelism; - if ((null != sparkMaster) && (!sparkMaster.isEmpty())) { - jsc.getConf().setMaster(sparkMaster); - } - jsc.getConf().set("spark.executor.memory", sparkMemory); new HoodieCompactionAdminTool(cfg).run(jsc); } private static void doCompactRepair(JavaSparkContext jsc, String basePath, String compactionInstant, - String outputPath, int parallelism, String sparkMaster, String sparkMemory, boolean dryRun) throws Exception { + String outputPath, int parallelism, boolean dryRun) throws Exception { HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config(); cfg.basePath = basePath; cfg.operation = Operation.REPAIR; @@ -213,16 +214,11 @@ public class SparkMain { cfg.compactionInstantTime = compactionInstant; cfg.parallelism = parallelism; cfg.dryRun = dryRun; - if ((null != sparkMaster) && (!sparkMaster.isEmpty())) { - jsc.getConf().setMaster(sparkMaster); - } - jsc.getConf().set("spark.executor.memory", sparkMemory); new HoodieCompactionAdminTool(cfg).run(jsc); } private static void doCompactUnschedule(JavaSparkContext jsc, String basePath, String compactionInstant, - String outputPath, int parallelism, String sparkMaster, String sparkMemory, boolean skipValidation, - boolean dryRun) throws Exception { + String outputPath, int parallelism, boolean skipValidation, boolean dryRun) throws Exception { HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config(); cfg.basePath = basePath; cfg.operation = Operation.UNSCHEDULE_PLAN; @@ -231,15 +227,11 @@ public class SparkMain { cfg.parallelism = parallelism; cfg.dryRun = dryRun; cfg.skipValidation = skipValidation; - if ((null != sparkMaster) && (!sparkMaster.isEmpty())) { - jsc.getConf().setMaster(sparkMaster); - } - jsc.getConf().set("spark.executor.memory", sparkMemory); new HoodieCompactionAdminTool(cfg).run(jsc); } private static void doCompactUnscheduleFile(JavaSparkContext jsc, String basePath, String fileId, String outputPath, - int parallelism, String sparkMaster, String sparkMemory, boolean skipValidation, boolean dryRun) + int parallelism, boolean skipValidation, boolean dryRun) throws Exception { HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config(); cfg.basePath = basePath; @@ -249,16 +241,12 @@ public class SparkMain { cfg.parallelism = parallelism; cfg.dryRun = dryRun; cfg.skipValidation = skipValidation; - if ((null != sparkMaster) && (!sparkMaster.isEmpty())) { - jsc.getConf().setMaster(sparkMaster); - } - jsc.getConf().set("spark.executor.memory", sparkMemory); new HoodieCompactionAdminTool(cfg).run(jsc); } private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant, int parallelism, String schemaFile, String sparkMemory, int retry, boolean schedule, String propsFilePath, - List<String> configs) { + List<String> configs) { HoodieCompactor.Config cfg = new HoodieCompactor.Config(); cfg.basePath = basePath; cfg.tableName = tableName; diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java index c7ac6dd..ba9d1e5 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java @@ -18,10 +18,12 @@ package org.apache.hudi.cli.utils; +import org.apache.hudi.cli.HoodieCliSparkConfig; import org.apache.hudi.cli.commands.SparkEnvCommand; import org.apache.hudi.cli.commands.SparkMain; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.spark.SparkConf; @@ -38,7 +40,7 @@ import java.util.Objects; */ public class SparkUtil { - public static final String DEFAULT_SPARK_MASTER = "yarn-client"; + private static final String DEFAULT_SPARK_MASTER = "yarn-client"; /** * TODO: Need to fix a bunch of hardcoded stuff here eg: history server, spark distro. @@ -61,29 +63,37 @@ public class SparkUtil { } public static JavaSparkContext initJavaSparkConf(String name) { + return initJavaSparkConf(name, Option.empty(), Option.empty()); + } + + public static JavaSparkContext initJavaSparkConf(String name, Option<String> master, + Option<String> executorMemory) { SparkConf sparkConf = new SparkConf().setAppName(name); - String defMasterFromEnv = sparkConf.getenv("SPARK_MASTER"); - if ((null == defMasterFromEnv) || (defMasterFromEnv.isEmpty())) { + String defMaster = master.orElse(sparkConf.getenv(HoodieCliSparkConfig.CLI_SPARK_MASTER)); + if ((null == defMaster) || (defMaster.isEmpty())) { sparkConf.setMaster(DEFAULT_SPARK_MASTER); } else { - sparkConf.setMaster(defMasterFromEnv); + sparkConf.setMaster(defMaster); } - sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - sparkConf.set("spark.driver.maxResultSize", "2g"); - sparkConf.set("spark.eventLog.overwrite", "true"); - sparkConf.set("spark.eventLog.enabled", "true"); + sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, "org.apache.spark.serializer.KryoSerializer"); + sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g"); + sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true"); + sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "true"); + if (executorMemory.isPresent()) { + sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY, executorMemory.get()); + } // Configure hadoop conf - sparkConf.set("spark.hadoop.mapred.output.compress", "true"); - sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); - sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); - sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); + sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true"); + sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, "true"); + sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.GzipCodec"); + sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE, "BLOCK"); HoodieWriteClient.registerClasses(sparkConf); JavaSparkContext jsc = new JavaSparkContext(sparkConf); - jsc.hadoopConfiguration().setBoolean("parquet.enable.summary-metadata", false); + jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false); FSUtils.prepareHadoopConf(jsc.hadoopConfiguration()); return jsc; }