This is an automated email from the ASF dual-hosted git repository. garyli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new cb642ce [HUDI-1999] Refresh the base file view cache for WriteProfile (#3067) cb642ce is described below commit cb642ceb75ef903a77593af943c6ba19053257ee Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Tue Jun 15 23:18:38 2021 +0800 [HUDI-1999] Refresh the base file view cache for WriteProfile (#3067) Refresh the view to discover new small files. --- .../apache/hudi/io/FlinkMergeAndReplaceHandle.java | 12 ++- .../hudi/sink/StreamWriteOperatorCoordinator.java | 4 +- .../partitioner/profile/DeltaWriteProfile.java | 5 +- .../sink/partitioner/profile/WriteProfile.java | 37 ++++++++- .../sink/partitioner/profile/WriteProfiles.java | 94 ++++++++++++++++++++++ .../hudi/source/StreamReadMonitoringFunction.java | 58 +------------ .../hudi/sink/partitioner/TestBucketAssigner.java | 48 ++++++++++- 7 files changed, 197 insertions(+), 61 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java index c87f3dd..f414450 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java @@ -55,6 +55,11 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O> private boolean isClosed = false; + /** + * Flag saying whether we should replace the old file with new. + */ + private boolean shouldReplace = true; + public FlinkMergeAndReplaceHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, Path basePath) { @@ -103,11 +108,12 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O> @Override protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) { // old and new file name expects to be the same. - if (!oldFileName.equals(newFileName)) { + if (!FSUtils.getCommitTime(oldFileName).equals(instantTime)) { LOG.warn("MERGE and REPLACE handle expect the same name for old and new files,\n" + "while got new file: " + newFileName + " with old file: " + oldFileName + ",\n" + "this rarely happens when the checkpoint success event was not received yet\n" + "but the write task flush with new instant time, which does not break the UPSERT semantics"); + shouldReplace = false; } super.makeOldAndNewFilePaths(partitionPath, oldFileName, newFileName); try { @@ -146,6 +152,10 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O> } public void finalizeWrite() { + // Behaves like the normal merge handle if the write instant time changes. + if (!shouldReplace) { + return; + } // The file visibility should be kept by the configured ConsistencyGuard instance. try { fs.delete(oldFilePath, false); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index ddbb3dd..4963d31 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -206,11 +206,11 @@ public class StreamWriteOperatorCoordinator } // start new instant. startInstant(); + // sync Hive if is enabled + syncHiveIfEnabled(); } }, "commits the instant %s", this.instant ); - // sync Hive if is enabled - syncHiveIfEnabled(); } private void syncHiveIfEnabled() { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index 8f8c692..9f56bdd 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -54,12 +54,13 @@ public class DeltaWriteProfile extends WriteProfile { // Find out all eligible small file slices if (!commitTimeline.empty()) { HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); + // initialize the filesystem view based on the commit metadata + initFSViewIfNecessary(commitTimeline); // find smallest file in partition and append to it List<FileSlice> allSmallFileSlices = new ArrayList<>(); // If we can index log files, we can add more inserts to log files for fileIds including those under // pending compaction. - List<FileSlice> allFileSlices = - table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) + List<FileSlice> allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) .collect(Collectors.toList()); for (FileSlice fileSlice : allFileSlices) { if (isSmallFile(fileSlice)) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index 3aee176..71d0d83 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -25,12 +25,17 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sink.partitioner.BucketAssigner; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SmallFile; +import org.apache.hudi.util.StreamerUtil; +import org.apache.flink.core.fs.Path; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +61,11 @@ public class WriteProfile { protected final HoodieWriteConfig config; /** + * Table base path. + */ + private final Path basePath; + + /** * The hoodie table. */ protected final HoodieTable<?, ?, ?, ?> table; @@ -81,11 +91,23 @@ public class WriteProfile { */ private long reloadedCheckpointId; + /** + * The file system view cache for one checkpoint interval. + */ + protected HoodieTableFileSystemView fsView; + + /** + * Hadoop configuration. + */ + private final Configuration hadoopConf; + public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) { this.config = config; + this.basePath = new Path(config.getBasePath()); this.smallFilesMap = new HashMap<>(); this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize(); this.table = HoodieFlinkTable.create(config, context); + this.hadoopConf = StreamerUtil.getHadoopConf(); // profile the record statistics on construction recordProfile(); } @@ -160,7 +182,9 @@ public class WriteProfile { if (!commitTimeline.empty()) { // if we have some commits HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); - List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView() + // initialize the filesystem view based on the commit metadata + initFSViewIfNecessary(commitTimeline); + List<HoodieBaseFile> allFiles = fsView .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); for (HoodieBaseFile file : allFiles) { @@ -178,6 +202,16 @@ public class WriteProfile { return smallFileLocations; } + protected void initFSViewIfNecessary(HoodieTimeline commitTimeline) { + if (fsView == null) { + List<HoodieCommitMetadata> metadataList = commitTimeline.getInstants() + .map(instant -> WriteProfiles.getCommitMetadata(config.getTableName(), basePath, instant, commitTimeline)) + .collect(Collectors.toList()); + FileStatus[] commitFiles = WriteProfiles.getWritePathsOfInstants(basePath, hadoopConf, metadataList); + fsView = new HoodieTableFileSystemView(table.getMetaClient(), commitTimeline, commitFiles); + } + } + private void recordProfile() { this.avgSize = averageBytesPerRecord(); if (config.shouldAllowMultiWriteOnSameInstant()) { @@ -200,6 +234,7 @@ public class WriteProfile { return; } recordProfile(); + this.fsView = null; this.smallFilesMap.clear(); this.table.getMetaClient().reloadActiveTimeline(); this.reloadedCheckpointId = checkpointId; diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java index cf99ab3..9a8b7d0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java @@ -19,15 +19,33 @@ package org.apache.hudi.sink.partitioner.profile; import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.flink.core.fs.Path; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Factory for {@link WriteProfile}. */ public class WriteProfiles { + private static final Logger LOG = LoggerFactory.getLogger(WriteProfiles.class); + private static final Map<String, WriteProfile> PROFILES = new HashMap<>(); private WriteProfiles() {} @@ -58,4 +76,80 @@ public class WriteProfiles { public static void clean(String path) { PROFILES.remove(path); } + + /** + * Returns all the incremental write file path statuses with the given commits metadata. + * + * @param basePath Table base path + * @param hadoopConf The hadoop conf + * @param metadataList The commits metadata + * @return the file statuses array + */ + public static FileStatus[] getWritePathsOfInstants( + Path basePath, + Configuration hadoopConf, + List<HoodieCommitMetadata> metadataList) { + FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf); + return metadataList.stream().map(metadata -> getWritePathsOfInstant(basePath, metadata, fs)) + .flatMap(Collection::stream).toArray(FileStatus[]::new); + } + + /** + * Returns the commit file paths with given metadata. + * + * @param basePath Table base path + * @param metadata The metadata + * @param fs The filesystem + * + * @return the commit file status list + */ + private static List<FileStatus> getWritePathsOfInstant(Path basePath, HoodieCommitMetadata metadata, FileSystem fs) { + return metadata.getFileIdAndFullPaths(basePath.toString()).values().stream() + .map(org.apache.hadoop.fs.Path::new) + // filter out the file paths that does not exist, some files may be cleaned by + // the cleaner. + .filter(path -> { + try { + return fs.exists(path); + } catch (IOException e) { + LOG.error("Checking exists of path: {} error", path); + throw new HoodieException(e); + } + }).map(path -> { + try { + return fs.getFileStatus(path); + } catch (IOException e) { + LOG.error("Get write status of path: {} error", path); + throw new HoodieException(e); + } + }) + // filter out crushed files + .filter(fileStatus -> fileStatus.getLen() > 0) + .collect(Collectors.toList()); + } + + /** + * Returns the commit metadata of the given instant. + * + * @param tableName The table name + * @param basePath The table base path + * @param instant The hoodie instant + * @param timeline The timeline + * + * @return the commit metadata + */ + public static HoodieCommitMetadata getCommitMetadata( + String tableName, + Path basePath, + HoodieInstant instant, + HoodieTimeline timeline) { + byte[] data = timeline.getInstantDetails(instant).get(); + try { + return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); + } catch (IOException e) { + LOG.error("Get write metadata for table {} with instant {} and path: {} error", + tableName, instant.getTimestamp(), basePath); + throw new HoodieException(e); + } + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index 3a2de84..983c19f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -18,7 +18,6 @@ package org.apache.hudi.source; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; @@ -30,7 +29,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.partitioner.profile.WriteProfiles; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.util.StreamerUtil; @@ -46,11 +45,9 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -220,10 +217,11 @@ public class StreamReadMonitoringFunction // 3. filter the full file paths // 4. use the file paths from #step 3 as the back-up of the filesystem view + String tableName = conf.getString(FlinkOptions.TABLE_NAME); List<HoodieCommitMetadata> metadataList = instants.stream() - .map(instant -> getCommitMetadata(instant, commitTimeline)).collect(Collectors.toList()); + .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList()); Set<String> writePartitions = getWritePartitionPaths(metadataList); - FileStatus[] fileStatuses = getWritePathsOfInstants(metadataList); + FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList); if (fileStatuses.length == 0) { LOG.warn("No files found for reading in user provided path."); return; @@ -334,52 +332,4 @@ public class StreamReadMonitoringFunction .flatMap(Collection::stream) .collect(Collectors.toSet()); } - - /** - * Returns all the incremental write file path statuses with the given commits metadata. - * - * @param metadataList The commits metadata - * @return the file statuses array - */ - private FileStatus[] getWritePathsOfInstants(List<HoodieCommitMetadata> metadataList) { - FileSystem fs = FSUtils.getFs(path.toString(), hadoopConf); - return metadataList.stream().map(metadata -> getWritePathsOfInstant(metadata, fs)) - .flatMap(Collection::stream).toArray(FileStatus[]::new); - } - - private List<FileStatus> getWritePathsOfInstant(HoodieCommitMetadata metadata, FileSystem fs) { - return metadata.getFileIdAndFullPaths(path.toString()).values().stream() - .map(org.apache.hadoop.fs.Path::new) - // filter out the file paths that does not exist, some files may be cleaned by - // the cleaner. - .filter(path -> { - try { - return fs.exists(path); - } catch (IOException e) { - LOG.error("Checking exists of path: {} error", path); - throw new HoodieException(e); - } - }).map(path -> { - try { - return fs.getFileStatus(path); - } catch (IOException e) { - LOG.error("Get write status of path: {} error", path); - throw new HoodieException(e); - } - }) - // filter out crushed files - .filter(fileStatus -> fileStatus.getLen() > 0) - .collect(Collectors.toList()); - } - - private HoodieCommitMetadata getCommitMetadata(HoodieInstant instant, HoodieTimeline timeline) { - byte[] data = timeline.getInstantDetails(instant).get(); - try { - return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); - } catch (IOException e) { - LOG.error("Get write metadata for table {} with instant {} and path: {} error", - conf.getString(FlinkOptions.TABLE_NAME), instant.getTimestamp(), path); - throw new HoodieException(e); - } - } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 1c895b6..3efa444 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -22,6 +22,8 @@ import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sink.partitioner.profile.WriteProfile; import org.apache.hudi.table.action.commit.BucketInfo; @@ -29,6 +31,7 @@ import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.table.action.commit.SmallFile; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; +import org.apache.hudi.utils.TestData; import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.BeforeEach; @@ -45,6 +48,10 @@ import java.util.Map; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test cases for {@link BucketAssigner}. @@ -52,6 +59,7 @@ import static org.hamcrest.MatcherAssert.assertThat; public class TestBucketAssigner { private HoodieWriteConfig writeConfig; private HoodieFlinkEngineContext context; + private Configuration conf; @TempDir File tempFile; @@ -59,7 +67,7 @@ public class TestBucketAssigner { @BeforeEach public void before() throws IOException { final String basePath = tempFile.getAbsolutePath(); - final Configuration conf = TestConfigurations.getDefaultConf(basePath); + conf = TestConfigurations.getDefaultConf(basePath); writeConfig = StreamerUtil.getHoodieClientConfig(conf); context = new HoodieFlinkEngineContext( @@ -291,6 +299,44 @@ public class TestBucketAssigner { assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1"); } + @Test + public void testWriteProfileReload() throws Exception { + WriteProfile writeProfile = new WriteProfile(writeConfig, context); + List<SmallFile> smallFiles1 = writeProfile.getSmallFiles("par1"); + assertTrue(smallFiles1.isEmpty(), "Should have no small files"); + + TestData.writeData(TestData.DATA_SET_INSERT, conf); + Option<String> instantOption = getLastCompleteInstant(writeProfile); + assertFalse(instantOption.isPresent()); + + writeProfile.reload(1); + String instant1 = getLastCompleteInstant(writeProfile).orElse(null); + assertNotNull(instant1); + List<SmallFile> smallFiles2 = writeProfile.getSmallFiles("par1"); + assertThat("Should have 1 small file", smallFiles2.size(), is(1)); + assertThat("Small file should have same timestamp as last complete instant", + smallFiles2.get(0).location.getInstantTime(), is(instant1)); + + TestData.writeData(TestData.DATA_SET_INSERT, conf); + List<SmallFile> smallFiles3 = writeProfile.getSmallFiles("par1"); + assertThat("Should have 1 small file", smallFiles3.size(), is(1)); + assertThat("Non-reloaded write profile has the same base file view as before", + smallFiles3.get(0).location.getInstantTime(), is(instant1)); + + writeProfile.reload(2); + String instant2 = getLastCompleteInstant(writeProfile).orElse(null); + assertNotEquals(instant2, instant1, "Should have new complete instant"); + List<SmallFile> smallFiles4 = writeProfile.getSmallFiles("par1"); + assertThat("Should have 1 small file", smallFiles4.size(), is(1)); + assertThat("Small file should have same timestamp as last complete instant", + smallFiles4.get(0).location.getInstantTime(), is(instant2)); + } + + private static Option<String> getLastCompleteInstant(WriteProfile profile) { + return profile.getTable().getMetaClient().getCommitsTimeline() + .filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp); + } + private void assertBucketEquals( BucketInfo bucketInfo, String partition,