This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new a64afdf HUDI-528 Handle empty commit in incremental pulling (#1612) a64afdf is described below commit a64afdfd17ac974e451bceb877f3d40a9c775253 Author: Gary Li <yanjia.gary...@gmail.com> AuthorDate: Thu May 14 22:55:25 2020 -0700 HUDI-528 Handle empty commit in incremental pulling (#1612) --- .../org/apache/hudi/IncrementalRelation.scala | 29 +++++++++------------- .../apache/hudi/functional/TestDataSource.scala | 8 ++++++ 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 8bb4609..436895b 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -19,9 +19,9 @@ package org.apache.hudi import org.apache.hadoop.fs.GlobPattern import org.apache.hadoop.fs.Path +import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType} -import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.util.ParquetUtils +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.table.HoodieTable @@ -47,8 +47,7 @@ class IncrementalRelation(val sqlContext: SQLContext, private val log = LogManager.getLogger(classOf[IncrementalRelation]) - val fs = new Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true) + private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true) // MOR tables not supported yet if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) { throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables") @@ -56,7 +55,7 @@ class IncrementalRelation(val sqlContext: SQLContext, // TODO : Figure out a valid HoodieWriteConfig private val hoodieTable = HoodieTable.create(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(), sqlContext.sparkContext.hadoopConfiguration) - val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants() + private val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants() if (commitTimeline.empty()) { throw new HoodieException("No instants to incrementally pull") } @@ -65,25 +64,21 @@ class IncrementalRelation(val sqlContext: SQLContext, s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}") } - val lastInstant = commitTimeline.lastInstant().get() + private val lastInstant = commitTimeline.lastInstant().get() - val commitsToReturn = commitTimeline.findInstantsInRange( + private val commitsToReturn = commitTimeline.findInstantsInRange( optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY), optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp)) .getInstants.iterator().toList - // use schema from a file produced in the latest instant - val latestSchema = { - // use last instant if instant range is empty - val instant = commitsToReturn.lastOption.getOrElse(lastInstant) - val latestMeta = HoodieCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(instant).get, classOf[HoodieCommitMetadata]) - val metaFilePath = latestMeta.getFileIdAndFullPaths(basePath).values().iterator().next() - AvroConversionUtils.convertAvroSchemaToStructType(ParquetUtils.readAvroSchema( - sqlContext.sparkContext.hadoopConfiguration, new Path(metaFilePath))) + // use schema from latest metadata, if not present, read schema from the data file + private val latestSchema = { + val schemaUtil = new TableSchemaResolver(metaClient) + val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields); + AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) } - val filters = { + private val filters = { if (optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) { val filterStr = optParams.getOrElse( DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY, diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala index fdd02bf..8352485 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala @@ -126,6 +126,14 @@ class TestDataSource { assertEquals(1, countsPerCommit.length) assertEquals(firstCommit, countsPerCommit(0).get(0)) + // Upsert an empty dataFrame + val emptyRecords = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("002", 0)).toList + val emptyDF: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1)) + emptyDF.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + // pull the latest commit val hoodieIncViewDF2 = spark.read.format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)