This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 5e769ecb66e [HUDI-7278] make bloom filter skippable for CPU saving (#10457) 5e769ecb66e is described below commit 5e769ecb66e1d1b6f8c270f7b10f3860a927f273 Author: kongwei <kong...@pku.edu.cn> AuthorDate: Fri Jan 12 17:37:51 2024 +0800 [HUDI-7278] make bloom filter skippable for CPU saving (#10457) * make bloom filter skippable for CPU saving --------- Co-authored-by: wei.kong <wei.k...@shopee.com> --- .../org/apache/hudi/config/HoodieWriteConfig.java | 4 +++ .../io/storage/HoodieSparkFileWriterFactory.java | 3 +-- .../storage/TestHoodieAvroFileWriterFactory.java | 31 ++++++++++++++++++++++ .../hudi/common/config/HoodieStorageConfig.java | 11 ++++++++ .../io/storage/HoodieAvroFileWriterFactory.java | 3 +-- .../hudi/io/storage/HoodieFileWriterFactory.java | 10 +++++++ 6 files changed, 58 insertions(+), 4 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index dd589cf4618..8fd3546671e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2150,6 +2150,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getString(HoodieStorageConfig.PARQUET_FIELD_ID_WRITE_ENABLED); } + public boolean parquetBloomFilterEnabled() { + return getBooleanOrDefault(HoodieStorageConfig.PARQUET_WITH_BLOOM_FILTER_ENABLED); + } + public Option<HoodieLogBlock.HoodieLogBlockType> getLogDataBlockFormat() { return Option.ofNullable(getString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT)) .map(HoodieLogBlock.HoodieLogBlockType::fromId); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java index d2ab83f1481..5feefa3bee2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java @@ -44,8 +44,7 @@ public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory { String instantTime, Path path, Configuration conf, HoodieConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException { boolean populateMetaFields = config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS); - boolean enableBloomFilter = populateMetaFields; - Option<BloomFilter> filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty(); + Option<BloomFilter> filter = enableBloomFilter(populateMetaFields, config) ? Option.of(createBloomFilter(config)) : Option.empty(); String compressionCodecName = config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME); // Support PARQUET_COMPRESSION_CODEC_NAME is "" if (compressionCodecName.isEmpty()) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java index 3afe6ee6708..120ae4fe891 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java @@ -19,9 +19,11 @@ package org.apache.hudi.io.storage; import org.apache.hudi.client.SparkTaskContextSupplier; +import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestBase; @@ -31,6 +33,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -74,4 +77,32 @@ public class TestHoodieAvroFileWriterFactory extends HoodieClientTestBase { }, "should fail since log storage writer is not supported yet."); assertTrue(thrown.getMessage().contains("format not supported yet.")); } + + @Test + public void testEnableBloomFilter() { + HoodieWriteConfig config = getConfig(IndexType.BLOOM); + assertTrue(HoodieFileWriterFactory.enableBloomFilter(true, config)); + assertFalse(HoodieFileWriterFactory.enableBloomFilter(false, config)); + + config = getConfig(IndexType.SIMPLE); + assertTrue(HoodieFileWriterFactory.enableBloomFilter(true, config)); + + config = getConfig(IndexType.SIMPLE); + assertTrue(HoodieFileWriterFactory.enableBloomFilter(true, config)); + + config = getConfigBuilder(IndexType.BLOOM) + .withStorageConfig(HoodieStorageConfig.newBuilder() + .parquetBloomFilterEnable(false).build()).build(); + assertTrue(HoodieFileWriterFactory.enableBloomFilter(true, config)); + + config = getConfigBuilder(IndexType.SIMPLE) + .withStorageConfig(HoodieStorageConfig.newBuilder() + .parquetBloomFilterEnable(true).build()).build(); + assertTrue(HoodieFileWriterFactory.enableBloomFilter(true, config)); + + config = getConfigBuilder(IndexType.SIMPLE) + .withStorageConfig(HoodieStorageConfig.newBuilder() + .parquetBloomFilterEnable(false).build()).build(); + assertFalse(HoodieFileWriterFactory.enableBloomFilter(true, config)); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java index 2660b0b22c8..d68b8326ca8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java @@ -152,6 +152,12 @@ public class HoodieStorageConfig extends HoodieConfig { .withDocumentation("Would only be effective with Spark 3.3+. Sets spark.sql.parquet.fieldId.write.enabled. " + "If enabled, Spark will write out parquet native field ids that are stored inside StructField's metadata as parquet.field.id to parquet files."); + public static final ConfigProperty<Boolean> PARQUET_WITH_BLOOM_FILTER_ENABLED = ConfigProperty + .key("hoodie.parquet.bloom.filter.enabled") + .defaultValue(true) + .withDocumentation("Control whether to write bloom filter or not. Default true. " + + "We can set to false in non bloom index cases for CPU resource saving."); + public static final ConfigProperty<String> HFILE_COMPRESSION_ALGORITHM_NAME = ConfigProperty .key("hoodie.hfile.compression.algorithm") .defaultValue("GZ") @@ -420,6 +426,11 @@ public class HoodieStorageConfig extends HoodieConfig { return this; } + public Builder parquetBloomFilterEnable(boolean parquetBloomFilterEnable) { + storageConfig.setValue(PARQUET_WITH_BLOOM_FILTER_ENABLED, String.valueOf(parquetBloomFilterEnable)); + return this; + } + public Builder hfileCompressionAlgorithm(String hfileCompressionAlgorithm) { storageConfig.setValue(HFILE_COMPRESSION_ALGORITHM_NAME, hfileCompressionAlgorithm); return this; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java index 8ed597ed920..471ab149fa5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java @@ -51,8 +51,7 @@ public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory { String instantTime, Path path, Configuration conf, HoodieConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException { boolean populateMetaFields = config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS); - boolean enableBloomFilter = populateMetaFields; - HoodieAvroWriteSupport writeSupport = getHoodieAvroWriteSupport(conf, schema, config, enableBloomFilter); + HoodieAvroWriteSupport writeSupport = getHoodieAvroWriteSupport(conf, schema, config, enableBloomFilter(populateMetaFields, config)); String compressionCodecName = config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME); // Support PARQUET_COMPRESSION_CODEC_NAME is "" diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index b968d0572b8..133feedb0ea 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -128,4 +128,14 @@ public class HoodieFileWriterFactory { config.getIntOrDefault(HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES), config.getStringOrDefault(HoodieStorageConfig.BLOOM_FILTER_TYPE)); } + + /** + * Check if need to enable bloom filter. + */ + public static boolean enableBloomFilter(boolean populateMetaFields, HoodieConfig config) { + return populateMetaFields && (config.getBooleanOrDefault(HoodieStorageConfig.PARQUET_WITH_BLOOM_FILTER_ENABLED) + // HoodieIndexConfig is located in the package hudi-client-common, and the package hudi-client-common depends on the package hudi-common, + // so the class HoodieIndexConfig cannot be accessed in hudi-common, otherwise there will be a circular dependency problem + || (config.contains("hoodie.index.type") && config.getString("hoodie.index.type").contains("BLOOM"))); + } }