This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 530640f61a5 [HUDI-7055] Support reading only log files in file group 
reader-based Spark parquet file format (#10020)
530640f61a5 is described below

commit 530640f61a5e3f8103d0b66ff866a2de995156b7
Author: Y Ethan Guo <ethan.guoyi...@gmail.com>
AuthorDate: Wed Nov 8 18:45:45 2023 -0800

    [HUDI-7055] Support reading only log files in file group reader-based Spark 
parquet file format (#10020)
---
 .../SparkFileFormatInternalRowReaderContext.scala  | 16 +++--
 .../table/read/TestHoodieFileGroupReaderBase.java  | 71 ++++++++++++++++------
 ...odieFileGroupReaderBasedParquetFileFormat.scala | 39 +++++++-----
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |  6 +-
 4 files changed, 92 insertions(+), 40 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index af3d3fd239c..beca8852686 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -44,10 +44,12 @@ import scala.collection.mutable
  *
  * This uses Spark parquet reader to read parquet data files or parquet log 
blocks.
  *
- * @param baseFileReader  A reader that transforms a {@link PartitionedFile} 
to an iterator of {@link InternalRow}.
+ * @param baseFileReader  A reader that transforms a {@link PartitionedFile} 
to an iterator of
+ *                        {@link InternalRow}. This is required for reading 
the base file and
+ *                        not required for reading a file group with only log 
files.
  * @param partitionValues The values for a partition in which the file group 
lives.
  */
-class SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile 
=> Iterator[InternalRow],
+class SparkFileFormatInternalRowReaderContext(baseFileReader: 
Option[PartitionedFile => Iterator[InternalRow]],
                                               partitionValues: InternalRow) 
extends BaseSparkInternalRowReaderContext {
   lazy val sparkAdapter = SparkAdapterSupport.sparkAdapter
   lazy val sparkFileReaderFactory = new HoodieSparkFileReaderFactory
@@ -62,11 +64,11 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile =>
     val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
       .createPartitionedFile(partitionValues, filePath, start, length)
     if (FSUtils.isLogFile(filePath)) {
-      val structType: StructType = 
HoodieInternalRowUtils.getCachedSchema(dataSchema)
+      val structType: StructType = 
HoodieInternalRowUtils.getCachedSchema(requiredSchema)
       val projection: UnsafeProjection = 
HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType)
       new CloseableMappingIterator[InternalRow, UnsafeRow](
         sparkFileReaderFactory.newParquetFileReader(conf, 
filePath).asInstanceOf[HoodieSparkParquetReader]
-          .getInternalRowIterator(dataSchema, dataSchema),
+          .getInternalRowIterator(dataSchema, requiredSchema),
         new java.util.function.Function[InternalRow, UnsafeRow] {
           override def apply(data: InternalRow): UnsafeRow = {
             // NOTE: We have to do [[UnsafeProjection]] of incoming 
[[InternalRow]] to convert
@@ -75,7 +77,11 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile =>
           }
         }).asInstanceOf[ClosableIterator[InternalRow]]
     } else {
-      new CloseableInternalRowIterator(baseFileReader.apply(fileInfo))
+      if (baseFileReader.isEmpty) {
+        throw new IllegalArgumentException("Base file reader is missing when 
instantiating "
+          + "SparkFileFormatInternalRowReaderContext.");
+      }
+      new CloseableInternalRowIterator(baseFileReader.get.apply(fileInfo))
     }
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index febc0d32466..439948a6cc9 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -40,8 +40,9 @@ import org.apache.hudi.metadata.HoodieTableMetadata;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -78,40 +79,71 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
                                                   Schema schema,
                                                   String fileGroupId);
 
-  @Test
-  public void testReadFileGroupInMergeOnReadTable() throws Exception {
-    Map<String, String> writeConfigs = new HashMap<>();
-    writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
"parquet");
-    writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key");
-    writeConfigs.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"partition_path");
-    writeConfigs.put("hoodie.datasource.write.precombine.field", "timestamp");
-    writeConfigs.put("hoodie.payload.ordering.field", "timestamp");
-    writeConfigs.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
-    writeConfigs.put("hoodie.insert.shuffle.parallelism", "4");
-    writeConfigs.put("hoodie.upsert.shuffle.parallelism", "4");
-    writeConfigs.put("hoodie.bulkinsert.shuffle.parallelism", "2");
-    writeConfigs.put("hoodie.delete.shuffle.parallelism", "1");
-    writeConfigs.put("hoodie.merge.small.file.group.candidates.limit", "0");
-    writeConfigs.put("hoodie.compact.inline", "false");
+  @ParameterizedTest
+  @ValueSource(strings = {"avro", "parquet"})
+  public void testReadFileGroupInMergeOnReadTable(String logDataBlockFormat) 
throws Exception {
+    Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
+    writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
logDataBlockFormat);
 
     try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
       // One commit; reading one file group containing a base file only
       commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)), 
INSERT.value(), writeConfigs);
-      validateOutputFromFileGroupReader(getHadoopConf(), getBasePath(), 
dataGen.getPartitionPaths(), 0);
+      validateOutputFromFileGroupReader(
+          getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), true, 
0);
 
       // Two commits; reading one file group containing a base file and a log 
file
       commitToTable(recordsToStrings(dataGen.generateUpdates("002", 100)), 
UPSERT.value(), writeConfigs);
-      validateOutputFromFileGroupReader(getHadoopConf(), getBasePath(), 
dataGen.getPartitionPaths(), 1);
+      validateOutputFromFileGroupReader(
+          getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), true, 
1);
 
       // Three commits; reading one file group containing a base file and two 
log files
       commitToTable(recordsToStrings(dataGen.generateUpdates("003", 100)), 
UPSERT.value(), writeConfigs);
-      validateOutputFromFileGroupReader(getHadoopConf(), getBasePath(), 
dataGen.getPartitionPaths(), 2);
+      validateOutputFromFileGroupReader(
+          getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), true, 
2);
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"avro", "parquet"})
+  public void testReadLogFilesOnlyInMergeOnReadTable(String 
logDataBlockFormat) throws Exception {
+    Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
+    writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
logDataBlockFormat);
+    // Use InMemoryIndex to generate log only mor table
+    writeConfigs.put("hoodie.index.type", "INMEMORY");
+
+    try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
+      // One commit; reading one file group containing a base file only
+      commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)), 
INSERT.value(), writeConfigs);
+      validateOutputFromFileGroupReader(
+          getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), false, 
1);
+
+      // Two commits; reading one file group containing a base file and a log 
file
+      commitToTable(recordsToStrings(dataGen.generateUpdates("002", 100)), 
UPSERT.value(), writeConfigs);
+      validateOutputFromFileGroupReader(
+          getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), false, 
2);
+    }
+  }
+
+  private Map<String, String> getCommonConfigs() {
+    Map<String, String> configMapping = new HashMap<>();
+    configMapping.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key");
+    configMapping.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"partition_path");
+    configMapping.put("hoodie.datasource.write.precombine.field", "timestamp");
+    configMapping.put("hoodie.payload.ordering.field", "timestamp");
+    configMapping.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
+    configMapping.put("hoodie.insert.shuffle.parallelism", "4");
+    configMapping.put("hoodie.upsert.shuffle.parallelism", "4");
+    configMapping.put("hoodie.bulkinsert.shuffle.parallelism", "2");
+    configMapping.put("hoodie.delete.shuffle.parallelism", "1");
+    configMapping.put("hoodie.merge.small.file.group.candidates.limit", "0");
+    configMapping.put("hoodie.compact.inline", "false");
+    return configMapping;
+  }
+
   private void validateOutputFromFileGroupReader(Configuration hadoopConf,
                                                  String tablePath,
                                                  String[] partitionPaths,
+                                                 boolean containsBaseFile,
                                                  int expectedLogFileNum) 
throws Exception {
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
         .setConf(hadoopConf).setBasePath(tablePath).build();
@@ -138,6 +170,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       props.setProperty(PARTITION_FIELDS.key(), 
metaClient.getTableConfig().getString(PARTITION_FIELDS));
     }
     String[] partitionValues = partitionPaths[0].isEmpty() ? new String[] {} : 
new String[] {partitionPaths[0]};
+    assertEquals(containsBaseFile, fileSlice.getBaseFile().isPresent());
     HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
         getHoodieReaderContext(tablePath, partitionValues),
         hadoopConf,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 495569b2ce8..4073d064dd5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -21,18 +21,15 @@ import kotlin.NotImplementedError
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
+import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, 
HoodieCDCFileGroupSplit}
+import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile, 
HoodieRecord}
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, 
HoodieFileGroupId, HoodieLogFile, HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.read.HoodieFileGroupReader
-import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, 
HoodieCDCFileGroupSplit}
-import org.apache.hudi.common.config.TypedProperties
-import org.apache.hudi.common.model.HoodieFileGroupId
 import org.apache.hudi.common.util.{Option => HOption}
-import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex, 
HoodiePartitionCDCFileGroupMapping,
-  HoodieSparkUtils, HoodieTableSchema, HoodieTableState, 
MergeOnReadSnapshotRelation, HoodiePartitionFileSliceMapping,
-  SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
HoodieSparkUtils, HoodieTableSchema, HoodieTableState, 
MergeOnReadSnapshotRelation, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -42,7 +39,6 @@ import 
org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
-import org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD
 
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
@@ -110,8 +106,21 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
         case fileSliceMapping: HoodiePartitionFileSliceMapping =>
           val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
           if (FSUtils.isLogFile(filePath)) {
-            // TODO: Use FileGroupReader here: HUDI-6942.
-            throw new NotImplementedError("Not support reading with only log 
files")
+            val partitionValues = fileSliceMapping.getPartitionValues
+            val fileSlice = 
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
+            buildFileGroupIterator(
+              Option.empty[PartitionedFile => Iterator[InternalRow]],
+              partitionValues,
+              Option.empty[HoodieBaseFile],
+              getLogFilesFromSlice(fileSlice),
+              requiredSchemaWithMandatory,
+              outputSchema,
+              partitionSchema,
+              broadcastedHadoopConf.value.value,
+              -1,
+              -1,
+              shouldUseRecordPosition
+            )
           } else {
             fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName)) 
match {
               case Some(fileSlice) =>
@@ -132,9 +141,9 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                         + "since it has no log or data files")
                   }
                   buildFileGroupIterator(
-                    preMergeBaseFileReader,
+                    Option(preMergeBaseFileReader),
                     partitionValues,
-                    hoodieBaseFile,
+                    Option(hoodieBaseFile),
                     logFiles,
                     requiredSchemaWithMandatory,
                     outputSchema,
@@ -180,9 +189,9 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       props)
   }
 
-  protected def buildFileGroupIterator(preMergeBaseFileReader: PartitionedFile 
=> Iterator[InternalRow],
+  protected def buildFileGroupIterator(preMergeBaseFileReader: 
Option[PartitionedFile => Iterator[InternalRow]],
                                        partitionValues: InternalRow,
-                                       baseFile: HoodieBaseFile,
+                                       baseFile: Option[HoodieBaseFile],
                                        logFiles: List[HoodieLogFile],
                                        requiredSchemaWithMandatory: StructType,
                                        outputSchema: StructType,
@@ -200,7 +209,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       hadoopConf,
       tableState.tablePath,
       tableState.latestCommitTimestamp.get,
-      HOption.of(baseFile),
+      if (baseFile.nonEmpty) HOption.of(baseFile.get) else HOption.empty(),
       HOption.of(logFiles.map(f => f.getPath.toString).asJava),
       HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, 
tableName),
       metaClient.getTableConfig.getProps,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index a1441be5ddc..087ae50ff02 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -21,6 +21,8 @@ package org.apache.hudi.common.table.read
 
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT
+import 
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
@@ -98,7 +100,7 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     }
 
     val partitionValueRow = new 
GenericInternalRow(partitionValuesEncoded.toArray[Any])
-    new SparkFileFormatInternalRowReaderContext(recordReaderIterator, 
partitionValueRow)
+    new SparkFileFormatInternalRowReaderContext(Option(recordReaderIterator), 
partitionValueRow)
   }
 
   override def commitToTable(recordList: util.List[String], operation: String, 
options: util.Map[String, String]): Unit = {
@@ -119,6 +121,8 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
                                           schema: Schema,
                                           fileGroupId: String): Unit = {
     val expectedDf = spark.read.format("hudi")
+      .option(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(), "false")
+      .option(FILE_GROUP_READER_ENABLED.key(), "false")
       .load(basePath)
       .where(col(HoodieRecord.FILENAME_METADATA_FIELD).contains(fileGroupId))
     assertEquals(expectedDf.count, actualRecordList.size)

Reply via email to