This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 2d5b79d [HUDI-438] Merge duplicated code fragment in HoodieSparkSqlWriter (#1114) 2d5b79d is described below commit 2d5b79d96fa23571da5003fd4460d2d6d3998275 Author: hongdd <jn_...@163.com> AuthorDate: Mon Jan 6 22:51:22 2020 +0800 [HUDI-438] Merge duplicated code fragment in HoodieSparkSqlWriter (#1114) --- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 165 ++++++++------------- 1 file changed, 63 insertions(+), 102 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index af19e28..62bdd19 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.common.model.HoodieRecordPayload import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.util.{FSUtils, TypedProperties} @@ -74,19 +75,14 @@ private[hudi] object HoodieSparkSqlWriter { parameters(OPERATION_OPT_KEY) } - var writeSuccessful: Boolean = false - var writeStatuses: JavaRDD[WriteStatus] = null - val jsc = new JavaSparkContext(sparkContext) val basePath = new Path(parameters("path")) val commitTime = HoodieActiveTimeline.createNewInstantTime(); val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) - // Running into issues wrt generic type conversion from Java to Scala. Couldn't make common code paths for - // write and deletes. Specifically, instantiating client of type HoodieWriteClient<T extends HoodieRecordPayload> - // is having issues. Hence some codes blocks are same in both if and else blocks. - if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { + val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) = + if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { // register classes & schemas val structName = s"${tblName.get}_record" val nameSpace = s"hoodie.${tblName.get}" @@ -147,54 +143,8 @@ private[hudi] object HoodieSparkSqlWriter { (true, common.util.Option.empty()) } client.startCommitWithTime(commitTime) - writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation) - // Check for errors and commit the write. - val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count() - writeSuccessful = - if (errorCount == 0) { - log.info("No errors. Proceeding to commit the write.") - val metaMap = parameters.filter(kv => - kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY))) - val commitSuccess = if (metaMap.isEmpty) { - client.commit(commitTime, writeStatuses) - } else { - client.commit(commitTime, writeStatuses, - common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) - } - - if (commitSuccess) { - log.info("Commit " + commitTime + " successful!") - } - else { - log.info("Commit " + commitTime + " failed!") - } - - val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) - val syncHiveSucess = if (hiveSyncEnabled) { - log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") - val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration) - syncHive(basePath, fs, parameters) - } else { - true - } - client.close() - commitSuccess && syncHiveSucess - } else { - log.error(s"$operation failed with ${errorCount} errors :"); - if (log.isTraceEnabled) { - log.trace("Printing out the top 100 errors") - writeStatuses.rdd.filter(ws => ws.hasErrors) - .take(100) - .foreach(ws => { - log.trace("Global error :", ws.getGlobalError) - if (ws.getErrors.size() > 0) { - ws.getErrors.foreach(kt => - log.trace(s"Error for key: ${kt._1}", kt._2)) - } - }) - } - false - } + val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation) + (writeStatuses, client) } else { // Handle save modes @@ -225,55 +175,12 @@ private[hudi] object HoodieSparkSqlWriter { // Issue deletes client.startCommitWithTime(commitTime) - writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime) - val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count() - writeSuccessful = - if (errorCount == 0) { - log.info("No errors. Proceeding to commit the write.") - val metaMap = parameters.filter(kv => - kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY))) - val commitSuccess = if (metaMap.isEmpty) { - client.commit(commitTime, writeStatuses) - } else { - client.commit(commitTime, writeStatuses, - common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) - } - - if (commitSuccess) { - log.info("Commit " + commitTime + " successful!") - } - else { - log.info("Commit " + commitTime + " failed!") - } - - val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) - val syncHiveSucess = if (hiveSyncEnabled) { - log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") - val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration) - syncHive(basePath, fs, parameters) - } else { - true - } - client.close() - commitSuccess && syncHiveSucess - } else { - log.error(s"$operation failed with ${errorCount} errors :"); - if (log.isTraceEnabled) { - log.trace("Printing out the top 100 errors") - writeStatuses.rdd.filter(ws => ws.hasErrors) - .take(100) - .foreach(ws => { - log.trace("Global error :", ws.getGlobalError) - if (ws.getErrors.size() > 0) { - ws.getErrors.foreach(kt => - log.trace(s"Error for key: ${kt._1}", kt._2)) - } - }) - } - false - } + val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime) + (writeStatuses, client) } + // Check for errors and commit the write. + val writeSuccessful = checkWriteStatus(writeStatuses, parameters, writeClient, commitTime, basePath, operation, jsc) (writeSuccessful, common.util.Option.ofNullable(commitTime)) } @@ -340,4 +247,58 @@ private[hudi] object HoodieSparkSqlWriter { hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY) hiveSyncConfig } + + private def checkWriteStatus(writeStatuses: JavaRDD[WriteStatus], + parameters: Map[String, String], + client: HoodieWriteClient[_], + commitTime: String, + basePath: Path, + operation: String, + jsc: JavaSparkContext): Boolean = { + val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count() + if (errorCount == 0) { + log.info("No errors. Proceeding to commit the write.") + val metaMap = parameters.filter(kv => + kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY))) + val commitSuccess = if (metaMap.isEmpty) { + client.commit(commitTime, writeStatuses) + } else { + client.commit(commitTime, writeStatuses, + common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) + } + + if (commitSuccess) { + log.info("Commit " + commitTime + " successful!") + } + else { + log.info("Commit " + commitTime + " failed!") + } + + val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) + val syncHiveSucess = if (hiveSyncEnabled) { + log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") + val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration) + syncHive(basePath, fs, parameters) + } else { + true + } + client.close() + commitSuccess && syncHiveSucess + } else { + log.error(s"$operation failed with ${errorCount} errors :"); + if (log.isTraceEnabled) { + log.trace("Printing out the top 100 errors") + writeStatuses.rdd.filter(ws => ws.hasErrors) + .take(100) + .foreach(ws => { + log.trace("Global error :", ws.getGlobalError) + if (ws.getErrors.size() > 0) { + ws.getErrors.foreach(kt => + log.trace(s"Error for key: ${kt._1}", kt._2)) + } + }) + } + false + } + } }