This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d5b79d  [HUDI-438] Merge duplicated code fragment in 
HoodieSparkSqlWriter (#1114)
2d5b79d is described below

commit 2d5b79d96fa23571da5003fd4460d2d6d3998275
Author: hongdd <jn_...@163.com>
AuthorDate: Mon Jan 6 22:51:22 2020 +0800

    [HUDI-438] Merge duplicated code fragment in HoodieSparkSqlWriter (#1114)
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 165 ++++++++-------------
 1 file changed, 63 insertions(+), 102 deletions(-)

diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index af19e28..62bdd19 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.HoodieRecordPayload
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.common.util.{FSUtils, TypedProperties}
@@ -74,19 +75,14 @@ private[hudi] object HoodieSparkSqlWriter {
         parameters(OPERATION_OPT_KEY)
       }
 
-    var writeSuccessful: Boolean = false
-    var writeStatuses: JavaRDD[WriteStatus] = null
-
     val jsc = new JavaSparkContext(sparkContext)
     val basePath = new Path(parameters("path"))
     val commitTime = HoodieActiveTimeline.createNewInstantTime();
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     var exists = fs.exists(new Path(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME))
 
-    // Running into issues wrt generic type conversion from Java to Scala.  
Couldn't make common code paths for
-    // write and deletes. Specifically, instantiating client of type 
HoodieWriteClient<T extends HoodieRecordPayload>
-    // is having issues. Hence some codes blocks are same in both if and else 
blocks.
-    if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
+    val (writeStatuses, writeClient: 
HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
+      if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
       // register classes & schemas
       val structName = s"${tblName.get}_record"
       val nameSpace = s"hoodie.${tblName.get}"
@@ -147,54 +143,8 @@ private[hudi] object HoodieSparkSqlWriter {
         (true, common.util.Option.empty())
       }
       client.startCommitWithTime(commitTime)
-      writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, 
commitTime, operation)
-      // Check for errors and commit the write.
-      val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
-      writeSuccessful =
-        if (errorCount == 0) {
-          log.info("No errors. Proceeding to commit the write.")
-          val metaMap = parameters.filter(kv =>
-            kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
-          val commitSuccess = if (metaMap.isEmpty) {
-            client.commit(commitTime, writeStatuses)
-          } else {
-            client.commit(commitTime, writeStatuses,
-              common.util.Option.of(new util.HashMap[String, 
String](mapAsJavaMap(metaMap))))
-          }
-
-          if (commitSuccess) {
-            log.info("Commit " + commitTime + " successful!")
-          }
-          else {
-            log.info("Commit " + commitTime + " failed!")
-          }
-
-          val hiveSyncEnabled = 
parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
-          val syncHiveSucess = if (hiveSyncEnabled) {
-            log.info("Syncing to Hive Metastore (URL: " + 
parameters(HIVE_URL_OPT_KEY) + ")")
-            val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
-            syncHive(basePath, fs, parameters)
-          } else {
-            true
-          }
-          client.close()
-          commitSuccess && syncHiveSucess
-        } else {
-          log.error(s"$operation failed with ${errorCount} errors :");
-          if (log.isTraceEnabled) {
-            log.trace("Printing out the top 100 errors")
-            writeStatuses.rdd.filter(ws => ws.hasErrors)
-              .take(100)
-              .foreach(ws => {
-                log.trace("Global error :", ws.getGlobalError)
-                if (ws.getErrors.size() > 0) {
-                  ws.getErrors.foreach(kt =>
-                    log.trace(s"Error for key: ${kt._1}", kt._2))
-                }
-              })
-          }
-          false
-        }
+      val writeStatuses = DataSourceUtils.doWriteOperation(client, 
hoodieRecords, commitTime, operation)
+      (writeStatuses, client)
     } else {
 
       // Handle save modes
@@ -225,55 +175,12 @@ private[hudi] object HoodieSparkSqlWriter {
 
       // Issue deletes
       client.startCommitWithTime(commitTime)
-      writeStatuses = DataSourceUtils.doDeleteOperation(client, 
hoodieKeysToDelete, commitTime)
-      val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
-      writeSuccessful =
-        if (errorCount == 0) {
-          log.info("No errors. Proceeding to commit the write.")
-          val metaMap = parameters.filter(kv =>
-            kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
-          val commitSuccess = if (metaMap.isEmpty) {
-            client.commit(commitTime, writeStatuses)
-          } else {
-            client.commit(commitTime, writeStatuses,
-              common.util.Option.of(new util.HashMap[String, 
String](mapAsJavaMap(metaMap))))
-          }
-
-          if (commitSuccess) {
-            log.info("Commit " + commitTime + " successful!")
-          }
-          else {
-            log.info("Commit " + commitTime + " failed!")
-          }
-
-          val hiveSyncEnabled = 
parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
-          val syncHiveSucess = if (hiveSyncEnabled) {
-            log.info("Syncing to Hive Metastore (URL: " + 
parameters(HIVE_URL_OPT_KEY) + ")")
-            val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
-            syncHive(basePath, fs, parameters)
-          } else {
-            true
-          }
-          client.close()
-          commitSuccess && syncHiveSucess
-        } else {
-          log.error(s"$operation failed with ${errorCount} errors :");
-          if (log.isTraceEnabled) {
-            log.trace("Printing out the top 100 errors")
-            writeStatuses.rdd.filter(ws => ws.hasErrors)
-              .take(100)
-              .foreach(ws => {
-                log.trace("Global error :", ws.getGlobalError)
-                if (ws.getErrors.size() > 0) {
-                  ws.getErrors.foreach(kt =>
-                    log.trace(s"Error for key: ${kt._1}", kt._2))
-                }
-              })
-          }
-          false
-        }
+      val writeStatuses = DataSourceUtils.doDeleteOperation(client, 
hoodieKeysToDelete, commitTime)
+      (writeStatuses, client)
     }
 
+    // Check for errors and commit the write.
+    val writeSuccessful = checkWriteStatus(writeStatuses, parameters, 
writeClient, commitTime, basePath, operation, jsc)
     (writeSuccessful, common.util.Option.ofNullable(commitTime))
   }
 
@@ -340,4 +247,58 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig.partitionValueExtractorClass = 
parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
     hiveSyncConfig
   }
+
+  private def checkWriteStatus(writeStatuses: JavaRDD[WriteStatus],
+                               parameters: Map[String, String],
+                               client: HoodieWriteClient[_],
+                               commitTime: String,
+                               basePath: Path,
+                               operation: String,
+                               jsc: JavaSparkContext): Boolean = {
+    val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
+    if (errorCount == 0) {
+      log.info("No errors. Proceeding to commit the write.")
+      val metaMap = parameters.filter(kv =>
+        kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
+      val commitSuccess = if (metaMap.isEmpty) {
+        client.commit(commitTime, writeStatuses)
+      } else {
+        client.commit(commitTime, writeStatuses,
+          common.util.Option.of(new util.HashMap[String, 
String](mapAsJavaMap(metaMap))))
+      }
+
+      if (commitSuccess) {
+        log.info("Commit " + commitTime + " successful!")
+      }
+      else {
+        log.info("Commit " + commitTime + " failed!")
+      }
+
+      val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
+      val syncHiveSucess = if (hiveSyncEnabled) {
+        log.info("Syncing to Hive Metastore (URL: " + 
parameters(HIVE_URL_OPT_KEY) + ")")
+        val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
+        syncHive(basePath, fs, parameters)
+      } else {
+        true
+      }
+      client.close()
+      commitSuccess && syncHiveSucess
+    } else {
+      log.error(s"$operation failed with ${errorCount} errors :");
+      if (log.isTraceEnabled) {
+        log.trace("Printing out the top 100 errors")
+        writeStatuses.rdd.filter(ws => ws.hasErrors)
+          .take(100)
+          .foreach(ws => {
+            log.trace("Global error :", ws.getGlobalError)
+            if (ws.getErrors.size() > 0) {
+              ws.getErrors.foreach(kt =>
+                log.trace(s"Error for key: ${kt._1}", kt._2))
+            }
+          })
+      }
+      false
+    }
+  }
 }

Reply via email to