This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a64afdf  HUDI-528 Handle empty commit in incremental pulling (#1612)
a64afdf is described below

commit a64afdfd17ac974e451bceb877f3d40a9c775253
Author: Gary Li <yanjia.gary...@gmail.com>
AuthorDate: Thu May 14 22:55:25 2020 -0700

    HUDI-528 Handle empty commit in incremental pulling (#1612)
---
 .../org/apache/hudi/IncrementalRelation.scala      | 29 +++++++++-------------
 .../apache/hudi/functional/TestDataSource.scala    |  8 ++++++
 2 files changed, 20 insertions(+), 17 deletions(-)

diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 8bb4609..436895b 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -19,9 +19,9 @@ package org.apache.hudi
 
 import org.apache.hadoop.fs.GlobPattern
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, 
HoodieTableType}
-import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.util.ParquetUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.table.HoodieTable
@@ -47,8 +47,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[IncrementalRelation])
 
-  val fs = new 
Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-  val metaClient = new 
HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, 
true)
+  private val metaClient = new 
HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, 
true)
   // MOR tables not supported yet
   if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
     throw new HoodieException("Incremental view not implemented yet, for 
merge-on-read tables")
@@ -56,7 +55,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
   // TODO : Figure out a valid HoodieWriteConfig
   private val hoodieTable = HoodieTable.create(metaClient, 
HoodieWriteConfig.newBuilder().withPath(basePath).build(),
     sqlContext.sparkContext.hadoopConfiguration)
-  val commitTimeline = 
hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
+  private val commitTimeline = 
hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
   if (commitTimeline.empty()) {
     throw new HoodieException("No instants to incrementally pull")
   }
@@ -65,25 +64,21 @@ class IncrementalRelation(val sqlContext: SQLContext,
       s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
   }
 
-  val lastInstant = commitTimeline.lastInstant().get()
+  private val lastInstant = commitTimeline.lastInstant().get()
 
-  val commitsToReturn = commitTimeline.findInstantsInRange(
+  private val commitsToReturn = commitTimeline.findInstantsInRange(
     optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY),
     optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, 
lastInstant.getTimestamp))
     .getInstants.iterator().toList
 
-  // use schema from a file produced in the latest instant
-  val latestSchema = {
-    // use last instant if instant range is empty
-    val instant = commitsToReturn.lastOption.getOrElse(lastInstant)
-    val latestMeta = HoodieCommitMetadata
-          .fromBytes(commitTimeline.getInstantDetails(instant).get, 
classOf[HoodieCommitMetadata])
-    val metaFilePath = 
latestMeta.getFileIdAndFullPaths(basePath).values().iterator().next()
-    
AvroConversionUtils.convertAvroSchemaToStructType(ParquetUtils.readAvroSchema(
-      sqlContext.sparkContext.hadoopConfiguration, new Path(metaFilePath)))
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields);
+    AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
   }
 
-  val filters = {
+  private val filters = {
     if 
(optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) {
       val filterStr = optParams.getOrElse(
         DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY,
diff --git 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala 
b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
index fdd02bf..8352485 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
@@ -126,6 +126,14 @@ class TestDataSource {
     assertEquals(1, countsPerCommit.length)
     assertEquals(firstCommit, countsPerCommit(0).get(0))
 
+    // Upsert an empty dataFrame
+    val emptyRecords = 
DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("002", 
0)).toList
+    val emptyDF: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
+    emptyDF.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
     // pull the latest commit
     val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
       .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)

Reply via email to