This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e5c867  [HUDI-740]Fix can not specify the sparkMaster and code clean 
for SparkUtil (#1452)
4e5c867 is described below

commit 4e5c8671ef3213ffa5c40f09aae27aacfa20f907
Author: hongdd <jn_...@163.com>
AuthorDate: Wed Apr 8 21:33:15 2020 +0800

    [HUDI-740]Fix can not specify the sparkMaster and code clean for SparkUtil 
(#1452)
---
 .../org/apache/hudi/cli/HoodieCliSparkConfig.java  | 46 ++++++++++++++++
 .../apache/hudi/cli/commands/CleansCommand.java    |  2 +-
 .../hudi/cli/commands/CompactionCommand.java       | 16 +++---
 .../org/apache/hudi/cli/commands/SparkMain.java    | 62 +++++++++-------------
 .../java/org/apache/hudi/cli/utils/SparkUtil.java  | 36 ++++++++-----
 5 files changed, 103 insertions(+), 59 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCliSparkConfig.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCliSparkConfig.java
new file mode 100644
index 0000000..0d64135
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCliSparkConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli;
+
+/**
+ * Class storing configs for init spark.
+ */
+public class HoodieCliSparkConfig {
+  /**
+   * Configs to start spark application.
+   */
+  public static final String CLI_SPARK_MASTER = "SPARK_MASTER";
+  public static final String CLI_SERIALIZER = "spark.serializer";
+  public static final String CLI_DRIVER_MAX_RESULT_SIZE = 
"spark.driver.maxResultSize";
+  public static final String CLI_EVENT_LOG_OVERWRITE = 
"spark.eventLog.overwrite";
+  public static final String CLI_EVENT_LOG_ENABLED = "spark.eventLog.enabled";
+  public static final String CLI_EXECUTOR_MEMORY = "spark.executor.memory";
+
+  /**
+   * Hadoop output config.
+   */
+  public static final String CLI_MAPRED_OUTPUT_COMPRESS = 
"spark.hadoop.mapred.output.compress";
+  public static final String CLI_MAPRED_OUTPUT_COMPRESSION_CODEC = 
"spark.hadoop.mapred.output.compression.codec";
+  public static final String CLI_MAPRED_OUTPUT_COMPRESSION_TYPE = 
"spark.hadoop.mapred.output.compression.type";
+
+  /**
+   * Parquet file config.
+   */
+  public static final String CLI_PARQUET_ENABLE_SUMMARY_METADATA = 
"parquet.enable.summary-metadata";
+}
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
index 34321ef..609e44b 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
@@ -139,7 +139,7 @@ public class CleansCommand implements CommandMarker {
     SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
 
     String cmd = SparkMain.SparkCommand.CLEAN.toString();
-    sparkLauncher.addAppArgs(cmd, metaClient.getBasePath(), master, 
propsFilePath, sparkMemory);
+    sparkLauncher.addAppArgs(cmd, master, sparkMemory, 
metaClient.getBasePath(), propsFilePath);
     UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
     Process process = sparkLauncher.launch();
     InputStreamConsumer.captureOutput(process);
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index 0843a87..a4c70da 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -423,8 +423,8 @@ public class CompactionCommand implements CommandMarker {
       String sparkPropertiesPath = Utils
           
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
       SparkLauncher sparkLauncher = 
SparkUtil.initLauncher(sparkPropertiesPath);
-      sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(), 
client.getBasePath(),
-          compactionInstant, outputPathStr, parallelism, master, sparkMemory);
+      sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(), 
master, sparkMemory, client.getBasePath(),
+          compactionInstant, outputPathStr, parallelism);
       Process process = sparkLauncher.launch();
       InputStreamConsumer.captureOutput(process);
       int exitCode = process.waitFor();
@@ -484,8 +484,8 @@ public class CompactionCommand implements CommandMarker {
       String sparkPropertiesPath = Utils
           
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
       SparkLauncher sparkLauncher = 
SparkUtil.initLauncher(sparkPropertiesPath);
-      
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), 
client.getBasePath(),
-          compactionInstant, outputPathStr, parallelism, master, sparkMemory, 
Boolean.valueOf(skipV).toString(),
+      
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), 
master, sparkMemory, client.getBasePath(),
+          compactionInstant, outputPathStr, parallelism, 
Boolean.valueOf(skipV).toString(),
           Boolean.valueOf(dryRun).toString());
       Process process = sparkLauncher.launch();
       InputStreamConsumer.captureOutput(process);
@@ -528,8 +528,8 @@ public class CompactionCommand implements CommandMarker {
       String sparkPropertiesPath = Utils
           
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
       SparkLauncher sparkLauncher = 
SparkUtil.initLauncher(sparkPropertiesPath);
-      
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), 
client.getBasePath(),
-          fileId, outputPathStr, "1", master, sparkMemory, 
Boolean.valueOf(skipV).toString(),
+      
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), 
master, sparkMemory, client.getBasePath(),
+          fileId, outputPathStr, "1", Boolean.valueOf(skipV).toString(),
           Boolean.valueOf(dryRun).toString());
       Process process = sparkLauncher.launch();
       InputStreamConsumer.captureOutput(process);
@@ -574,8 +574,8 @@ public class CompactionCommand implements CommandMarker {
       String sparkPropertiesPath = Utils
           
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
       SparkLauncher sparkLauncher = 
SparkUtil.initLauncher(sparkPropertiesPath);
-      sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(), 
client.getBasePath(),
-          compactionInstant, outputPathStr, parallelism, master, sparkMemory, 
Boolean.valueOf(dryRun).toString());
+      sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(), master, 
sparkMemory, client.getBasePath(),
+          compactionInstant, outputPathStr, parallelism, 
Boolean.valueOf(dryRun).toString());
       Process process = sparkLauncher.launch();
       InputStreamConsumer.captureOutput(process);
       int exitCode = process.waitFor();
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 89e62b9..e8f4c6a 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -22,6 +22,7 @@ import org.apache.hudi.cli.DedupeSparkJob;
 import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -62,7 +63,9 @@ public class SparkMain {
 
     SparkCommand cmd = SparkCommand.valueOf(command);
 
-    JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + 
command);
+    JavaSparkContext jsc = sparkMasterContained(cmd)
+        ? SparkUtil.initJavaSparkConf("hoodie-cli-" + command, 
Option.of(args[1]), Option.of(args[2]))
+        : SparkUtil.initJavaSparkConf("hoodie-cli-" + command);
     int returnCode = 0;
     switch (cmd) {
       case ROLLBACK:
@@ -118,38 +121,38 @@ public class SparkMain {
         break;
       case COMPACT_VALIDATE:
         assert (args.length == 7);
-        doCompactValidate(jsc, args[1], args[2], args[3], 
Integer.parseInt(args[4]), args[5], args[6]);
+        doCompactValidate(jsc, args[3], args[4], args[5], 
Integer.parseInt(args[6]));
         returnCode = 0;
         break;
       case COMPACT_REPAIR:
         assert (args.length == 8);
-        doCompactRepair(jsc, args[1], args[2], args[3], 
Integer.parseInt(args[4]), args[5], args[6],
+        doCompactRepair(jsc, args[3], args[4], args[5], 
Integer.parseInt(args[6]),
             Boolean.parseBoolean(args[7]));
         returnCode = 0;
         break;
       case COMPACT_UNSCHEDULE_FILE:
         assert (args.length == 9);
-        doCompactUnscheduleFile(jsc, args[1], args[2], args[3], 
Integer.parseInt(args[4]), args[5], args[6],
+        doCompactUnscheduleFile(jsc, args[3], args[4], args[5], 
Integer.parseInt(args[6]),
             Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
         returnCode = 0;
         break;
       case COMPACT_UNSCHEDULE_PLAN:
         assert (args.length == 9);
-        doCompactUnschedule(jsc, args[1], args[2], args[3], 
Integer.parseInt(args[4]), args[5], args[6],
+        doCompactUnschedule(jsc, args[3], args[4], args[5], 
Integer.parseInt(args[6]),
             Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
         returnCode = 0;
         break;
       case CLEAN:
         assert (args.length >= 5);
         propsFilePath = null;
-        if (!StringUtils.isNullOrEmpty(args[3])) {
-          propsFilePath = args[3];
+        if (!StringUtils.isNullOrEmpty(args[4])) {
+          propsFilePath = args[4];
         }
         configs = new ArrayList<>();
         if (args.length > 5) {
           configs.addAll(Arrays.asList(args).subList(5, args.length));
         }
-        clean(jsc, args[1], args[2], propsFilePath, args[4], configs);
+        clean(jsc, args[3], propsFilePath, configs);
         break;
       default:
         break;
@@ -157,14 +160,16 @@ public class SparkMain {
     System.exit(returnCode);
   }
 
-  private static void clean(JavaSparkContext jsc, String basePath, String 
sparkMaster, String propsFilePath,
-                            String sparkMemory, List<String> configs) throws 
Exception {
+  private static boolean sparkMasterContained(SparkCommand command) {
+    List<SparkCommand> masterContained = 
Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
+        SparkCommand.COMPACT_UNSCHEDULE_PLAN, 
SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN);
+    return masterContained.contains(command);
+  }
+
+  private static void clean(JavaSparkContext jsc, String basePath, String 
propsFilePath,
+      List<String> configs) {
     HoodieCleaner.Config cfg = new HoodieCleaner.Config();
     cfg.basePath = basePath;
-    if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
-      jsc.getConf().setMaster(sparkMaster);
-    }
-    jsc.getConf().set("spark.executor.memory", sparkMemory);
     cfg.propsFilePath = propsFilePath;
     cfg.configs = configs;
     new HoodieCleaner(cfg, jsc).run();
@@ -172,7 +177,7 @@ public class SparkMain {
 
   private static int dataLoad(JavaSparkContext jsc, String command, String 
srcPath, String targetPath, String tableName,
       String tableType, String rowKey, String partitionKey, int parallelism, 
String schemaFile, String sparkMemory,
-                              int retry, String propsFilePath, List<String> 
configs) {
+      int retry, String propsFilePath, List<String> configs) {
     Config cfg = new Config();
     cfg.command = command;
     cfg.srcPath = srcPath;
@@ -190,22 +195,18 @@ public class SparkMain {
   }
 
   private static void doCompactValidate(JavaSparkContext jsc, String basePath, 
String compactionInstant,
-      String outputPath, int parallelism, String sparkMaster, String 
sparkMemory) throws Exception {
+      String outputPath, int parallelism) throws Exception {
     HoodieCompactionAdminTool.Config cfg = new 
HoodieCompactionAdminTool.Config();
     cfg.basePath = basePath;
     cfg.operation = Operation.VALIDATE;
     cfg.outputPath = outputPath;
     cfg.compactionInstantTime = compactionInstant;
     cfg.parallelism = parallelism;
-    if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
-      jsc.getConf().setMaster(sparkMaster);
-    }
-    jsc.getConf().set("spark.executor.memory", sparkMemory);
     new HoodieCompactionAdminTool(cfg).run(jsc);
   }
 
   private static void doCompactRepair(JavaSparkContext jsc, String basePath, 
String compactionInstant,
-      String outputPath, int parallelism, String sparkMaster, String 
sparkMemory, boolean dryRun) throws Exception {
+      String outputPath, int parallelism, boolean dryRun) throws Exception {
     HoodieCompactionAdminTool.Config cfg = new 
HoodieCompactionAdminTool.Config();
     cfg.basePath = basePath;
     cfg.operation = Operation.REPAIR;
@@ -213,16 +214,11 @@ public class SparkMain {
     cfg.compactionInstantTime = compactionInstant;
     cfg.parallelism = parallelism;
     cfg.dryRun = dryRun;
-    if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
-      jsc.getConf().setMaster(sparkMaster);
-    }
-    jsc.getConf().set("spark.executor.memory", sparkMemory);
     new HoodieCompactionAdminTool(cfg).run(jsc);
   }
 
   private static void doCompactUnschedule(JavaSparkContext jsc, String 
basePath, String compactionInstant,
-      String outputPath, int parallelism, String sparkMaster, String 
sparkMemory, boolean skipValidation,
-      boolean dryRun) throws Exception {
+      String outputPath, int parallelism, boolean skipValidation, boolean 
dryRun) throws Exception {
     HoodieCompactionAdminTool.Config cfg = new 
HoodieCompactionAdminTool.Config();
     cfg.basePath = basePath;
     cfg.operation = Operation.UNSCHEDULE_PLAN;
@@ -231,15 +227,11 @@ public class SparkMain {
     cfg.parallelism = parallelism;
     cfg.dryRun = dryRun;
     cfg.skipValidation = skipValidation;
-    if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
-      jsc.getConf().setMaster(sparkMaster);
-    }
-    jsc.getConf().set("spark.executor.memory", sparkMemory);
     new HoodieCompactionAdminTool(cfg).run(jsc);
   }
 
   private static void doCompactUnscheduleFile(JavaSparkContext jsc, String 
basePath, String fileId, String outputPath,
-      int parallelism, String sparkMaster, String sparkMemory, boolean 
skipValidation, boolean dryRun)
+      int parallelism, boolean skipValidation, boolean dryRun)
       throws Exception {
     HoodieCompactionAdminTool.Config cfg = new 
HoodieCompactionAdminTool.Config();
     cfg.basePath = basePath;
@@ -249,16 +241,12 @@ public class SparkMain {
     cfg.parallelism = parallelism;
     cfg.dryRun = dryRun;
     cfg.skipValidation = skipValidation;
-    if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
-      jsc.getConf().setMaster(sparkMaster);
-    }
-    jsc.getConf().set("spark.executor.memory", sparkMemory);
     new HoodieCompactionAdminTool(cfg).run(jsc);
   }
 
   private static int compact(JavaSparkContext jsc, String basePath, String 
tableName, String compactionInstant,
       int parallelism, String schemaFile, String sparkMemory, int retry, 
boolean schedule, String propsFilePath,
-                             List<String> configs) {
+      List<String> configs) {
     HoodieCompactor.Config cfg = new HoodieCompactor.Config();
     cfg.basePath = basePath;
     cfg.tableName = tableName;
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
index c7ac6dd..ba9d1e5 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
@@ -18,10 +18,12 @@
 
 package org.apache.hudi.cli.utils;
 
+import org.apache.hudi.cli.HoodieCliSparkConfig;
 import org.apache.hudi.cli.commands.SparkEnvCommand;
 import org.apache.hudi.cli.commands.SparkMain;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 
 import org.apache.spark.SparkConf;
@@ -38,7 +40,7 @@ import java.util.Objects;
  */
 public class SparkUtil {
 
-  public static final String DEFAULT_SPARK_MASTER = "yarn-client";
+  private static final String DEFAULT_SPARK_MASTER = "yarn-client";
 
   /**
    * TODO: Need to fix a bunch of hardcoded stuff here eg: history server, 
spark distro.
@@ -61,29 +63,37 @@ public class SparkUtil {
   }
 
   public static JavaSparkContext initJavaSparkConf(String name) {
+    return initJavaSparkConf(name, Option.empty(), Option.empty());
+  }
+
+  public static JavaSparkContext initJavaSparkConf(String name, Option<String> 
master,
+      Option<String> executorMemory) {
     SparkConf sparkConf = new SparkConf().setAppName(name);
 
-    String defMasterFromEnv = sparkConf.getenv("SPARK_MASTER");
-    if ((null == defMasterFromEnv) || (defMasterFromEnv.isEmpty())) {
+    String defMaster = 
master.orElse(sparkConf.getenv(HoodieCliSparkConfig.CLI_SPARK_MASTER));
+    if ((null == defMaster) || (defMaster.isEmpty())) {
       sparkConf.setMaster(DEFAULT_SPARK_MASTER);
     } else {
-      sparkConf.setMaster(defMasterFromEnv);
+      sparkConf.setMaster(defMaster);
     }
 
-    sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
-    sparkConf.set("spark.driver.maxResultSize", "2g");
-    sparkConf.set("spark.eventLog.overwrite", "true");
-    sparkConf.set("spark.eventLog.enabled", "true");
+    sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, 
"org.apache.spark.serializer.KryoSerializer");
+    sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g");
+    sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true");
+    sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "true");
+    if (executorMemory.isPresent()) {
+      sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY, 
executorMemory.get());
+    }
 
     // Configure hadoop conf
-    sparkConf.set("spark.hadoop.mapred.output.compress", "true");
-    sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
-    sparkConf.set("spark.hadoop.mapred.output.compression.codec", 
"org.apache.hadoop.io.compress.GzipCodec");
-    sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
+    sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true");
+    sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, 
"true");
+    sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, 
"org.apache.hadoop.io.compress.GzipCodec");
+    sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE, 
"BLOCK");
 
     HoodieWriteClient.registerClasses(sparkConf);
     JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-    jsc.hadoopConfiguration().setBoolean("parquet.enable.summary-metadata", 
false);
+    
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA,
 false);
     FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());
     return jsc;
   }

Reply via email to