This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 530640f61a5 [HUDI-7055] Support reading only log files in file group reader-based Spark parquet file format (#10020) 530640f61a5 is described below commit 530640f61a5e3f8103d0b66ff866a2de995156b7 Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Wed Nov 8 18:45:45 2023 -0800 [HUDI-7055] Support reading only log files in file group reader-based Spark parquet file format (#10020) --- .../SparkFileFormatInternalRowReaderContext.scala | 16 +++-- .../table/read/TestHoodieFileGroupReaderBase.java | 71 ++++++++++++++++------ ...odieFileGroupReaderBasedParquetFileFormat.scala | 39 +++++++----- .../read/TestHoodieFileGroupReaderOnSpark.scala | 6 +- 4 files changed, 92 insertions(+), 40 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala index af3d3fd239c..beca8852686 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala @@ -44,10 +44,12 @@ import scala.collection.mutable * * This uses Spark parquet reader to read parquet data files or parquet log blocks. * - * @param baseFileReader A reader that transforms a {@link PartitionedFile} to an iterator of {@link InternalRow}. + * @param baseFileReader A reader that transforms a {@link PartitionedFile} to an iterator of + * {@link InternalRow}. This is required for reading the base file and + * not required for reading a file group with only log files. * @param partitionValues The values for a partition in which the file group lives. */ -class SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile => Iterator[InternalRow], +class SparkFileFormatInternalRowReaderContext(baseFileReader: Option[PartitionedFile => Iterator[InternalRow]], partitionValues: InternalRow) extends BaseSparkInternalRowReaderContext { lazy val sparkAdapter = SparkAdapterSupport.sparkAdapter lazy val sparkFileReaderFactory = new HoodieSparkFileReaderFactory @@ -62,11 +64,11 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile => val fileInfo = sparkAdapter.getSparkPartitionedFileUtils .createPartitionedFile(partitionValues, filePath, start, length) if (FSUtils.isLogFile(filePath)) { - val structType: StructType = HoodieInternalRowUtils.getCachedSchema(dataSchema) + val structType: StructType = HoodieInternalRowUtils.getCachedSchema(requiredSchema) val projection: UnsafeProjection = HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType) new CloseableMappingIterator[InternalRow, UnsafeRow]( sparkFileReaderFactory.newParquetFileReader(conf, filePath).asInstanceOf[HoodieSparkParquetReader] - .getInternalRowIterator(dataSchema, dataSchema), + .getInternalRowIterator(dataSchema, requiredSchema), new java.util.function.Function[InternalRow, UnsafeRow] { override def apply(data: InternalRow): UnsafeRow = { // NOTE: We have to do [[UnsafeProjection]] of incoming [[InternalRow]] to convert @@ -75,7 +77,11 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile => } }).asInstanceOf[ClosableIterator[InternalRow]] } else { - new CloseableInternalRowIterator(baseFileReader.apply(fileInfo)) + if (baseFileReader.isEmpty) { + throw new IllegalArgumentException("Base file reader is missing when instantiating " + + "SparkFileFormatInternalRowReaderContext."); + } + new CloseableInternalRowIterator(baseFileReader.get.apply(fileInfo)) } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java index febc0d32466..439948a6cc9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java @@ -40,8 +40,9 @@ import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; import java.util.Collections; @@ -78,40 +79,71 @@ public abstract class TestHoodieFileGroupReaderBase<T> { Schema schema, String fileGroupId); - @Test - public void testReadFileGroupInMergeOnReadTable() throws Exception { - Map<String, String> writeConfigs = new HashMap<>(); - writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"); - writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key"); - writeConfigs.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition_path"); - writeConfigs.put("hoodie.datasource.write.precombine.field", "timestamp"); - writeConfigs.put("hoodie.payload.ordering.field", "timestamp"); - writeConfigs.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test"); - writeConfigs.put("hoodie.insert.shuffle.parallelism", "4"); - writeConfigs.put("hoodie.upsert.shuffle.parallelism", "4"); - writeConfigs.put("hoodie.bulkinsert.shuffle.parallelism", "2"); - writeConfigs.put("hoodie.delete.shuffle.parallelism", "1"); - writeConfigs.put("hoodie.merge.small.file.group.candidates.limit", "0"); - writeConfigs.put("hoodie.compact.inline", "false"); + @ParameterizedTest + @ValueSource(strings = {"avro", "parquet"}) + public void testReadFileGroupInMergeOnReadTable(String logDataBlockFormat) throws Exception { + Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs()); + writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), logDataBlockFormat); try (HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEEF)) { // One commit; reading one file group containing a base file only commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)), INSERT.value(), writeConfigs); - validateOutputFromFileGroupReader(getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), 0); + validateOutputFromFileGroupReader( + getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), true, 0); // Two commits; reading one file group containing a base file and a log file commitToTable(recordsToStrings(dataGen.generateUpdates("002", 100)), UPSERT.value(), writeConfigs); - validateOutputFromFileGroupReader(getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), 1); + validateOutputFromFileGroupReader( + getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), true, 1); // Three commits; reading one file group containing a base file and two log files commitToTable(recordsToStrings(dataGen.generateUpdates("003", 100)), UPSERT.value(), writeConfigs); - validateOutputFromFileGroupReader(getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), 2); + validateOutputFromFileGroupReader( + getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), true, 2); } } + @ParameterizedTest + @ValueSource(strings = {"avro", "parquet"}) + public void testReadLogFilesOnlyInMergeOnReadTable(String logDataBlockFormat) throws Exception { + Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs()); + writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), logDataBlockFormat); + // Use InMemoryIndex to generate log only mor table + writeConfigs.put("hoodie.index.type", "INMEMORY"); + + try (HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEEF)) { + // One commit; reading one file group containing a base file only + commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)), INSERT.value(), writeConfigs); + validateOutputFromFileGroupReader( + getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), false, 1); + + // Two commits; reading one file group containing a base file and a log file + commitToTable(recordsToStrings(dataGen.generateUpdates("002", 100)), UPSERT.value(), writeConfigs); + validateOutputFromFileGroupReader( + getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), false, 2); + } + } + + private Map<String, String> getCommonConfigs() { + Map<String, String> configMapping = new HashMap<>(); + configMapping.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key"); + configMapping.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition_path"); + configMapping.put("hoodie.datasource.write.precombine.field", "timestamp"); + configMapping.put("hoodie.payload.ordering.field", "timestamp"); + configMapping.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test"); + configMapping.put("hoodie.insert.shuffle.parallelism", "4"); + configMapping.put("hoodie.upsert.shuffle.parallelism", "4"); + configMapping.put("hoodie.bulkinsert.shuffle.parallelism", "2"); + configMapping.put("hoodie.delete.shuffle.parallelism", "1"); + configMapping.put("hoodie.merge.small.file.group.candidates.limit", "0"); + configMapping.put("hoodie.compact.inline", "false"); + return configMapping; + } + private void validateOutputFromFileGroupReader(Configuration hadoopConf, String tablePath, String[] partitionPaths, + boolean containsBaseFile, int expectedLogFileNum) throws Exception { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() .setConf(hadoopConf).setBasePath(tablePath).build(); @@ -138,6 +170,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> { props.setProperty(PARTITION_FIELDS.key(), metaClient.getTableConfig().getString(PARTITION_FIELDS)); } String[] partitionValues = partitionPaths[0].isEmpty() ? new String[] {} : new String[] {partitionPaths[0]}; + assertEquals(containsBaseFile, fileSlice.getBaseFile().isPresent()); HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>( getHoodieReaderContext(tablePath, partitionValues), hadoopConf, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala index 495569b2ce8..4073d064dd5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala @@ -21,18 +21,15 @@ import kotlin.NotImplementedError import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile +import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, HoodieCDCFileGroupSplit} +import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.engine.HoodieReaderContext import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile, HoodieRecord} +import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieFileGroupId, HoodieLogFile, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.read.HoodieFileGroupReader -import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, HoodieCDCFileGroupSplit} -import org.apache.hudi.common.config.TypedProperties -import org.apache.hudi.common.model.HoodieFileGroupId import org.apache.hudi.common.util.{Option => HOption} -import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex, HoodiePartitionCDCFileGroupMapping, - HoodieSparkUtils, HoodieTableSchema, HoodieTableState, MergeOnReadSnapshotRelation, HoodiePartitionFileSliceMapping, - SparkAdapterSupport, SparkFileFormatInternalRowReaderContext} +import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex, HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, HoodieSparkUtils, HoodieTableSchema, HoodieTableState, MergeOnReadSnapshotRelation, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext} import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -42,7 +39,6 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.util.SerializableConfiguration -import org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -110,8 +106,21 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState, case fileSliceMapping: HoodiePartitionFileSliceMapping => val filePath = sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file) if (FSUtils.isLogFile(filePath)) { - // TODO: Use FileGroupReader here: HUDI-6942. - throw new NotImplementedError("Not support reading with only log files") + val partitionValues = fileSliceMapping.getPartitionValues + val fileSlice = fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get + buildFileGroupIterator( + Option.empty[PartitionedFile => Iterator[InternalRow]], + partitionValues, + Option.empty[HoodieBaseFile], + getLogFilesFromSlice(fileSlice), + requiredSchemaWithMandatory, + outputSchema, + partitionSchema, + broadcastedHadoopConf.value.value, + -1, + -1, + shouldUseRecordPosition + ) } else { fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName)) match { case Some(fileSlice) => @@ -132,9 +141,9 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState, + "since it has no log or data files") } buildFileGroupIterator( - preMergeBaseFileReader, + Option(preMergeBaseFileReader), partitionValues, - hoodieBaseFile, + Option(hoodieBaseFile), logFiles, requiredSchemaWithMandatory, outputSchema, @@ -180,9 +189,9 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState, props) } - protected def buildFileGroupIterator(preMergeBaseFileReader: PartitionedFile => Iterator[InternalRow], + protected def buildFileGroupIterator(preMergeBaseFileReader: Option[PartitionedFile => Iterator[InternalRow]], partitionValues: InternalRow, - baseFile: HoodieBaseFile, + baseFile: Option[HoodieBaseFile], logFiles: List[HoodieLogFile], requiredSchemaWithMandatory: StructType, outputSchema: StructType, @@ -200,7 +209,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState, hadoopConf, tableState.tablePath, tableState.latestCommitTimestamp.get, - HOption.of(baseFile), + if (baseFile.nonEmpty) HOption.of(baseFile.get) else HOption.empty(), HOption.of(logFiles.map(f => f.getPath.toString).asJava), HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, tableName), metaClient.getTableConfig.getProps, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala index a1441be5ddc..087ae50ff02 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala @@ -21,6 +21,8 @@ package org.apache.hudi.common.table.read import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration +import org.apache.hudi.DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT +import org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED import org.apache.hudi.common.engine.HoodieReaderContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType} @@ -98,7 +100,7 @@ class TestHoodieFileGroupReaderOnSpark extends TestHoodieFileGroupReaderBase[Int } val partitionValueRow = new GenericInternalRow(partitionValuesEncoded.toArray[Any]) - new SparkFileFormatInternalRowReaderContext(recordReaderIterator, partitionValueRow) + new SparkFileFormatInternalRowReaderContext(Option(recordReaderIterator), partitionValueRow) } override def commitToTable(recordList: util.List[String], operation: String, options: util.Map[String, String]): Unit = { @@ -119,6 +121,8 @@ class TestHoodieFileGroupReaderOnSpark extends TestHoodieFileGroupReaderBase[Int schema: Schema, fileGroupId: String): Unit = { val expectedDf = spark.read.format("hudi") + .option(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(), "false") + .option(FILE_GROUP_READER_ENABLED.key(), "false") .load(basePath) .where(col(HoodieRecord.FILENAME_METADATA_FIELD).contains(fileGroupId)) assertEquals(expectedDf.count, actualRecordList.size)