This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 3790b75 [HUDI-368] code clean up in TestAsyncCompaction class (#1050) 3790b75 is described below commit 3790b75e059a06e6f5467c8b8d549ef38cd6b98a Author: Pratyaksh Sharma <pratyaks...@gmail.com> AuthorDate: Wed Dec 11 03:22:41 2019 +0530 [HUDI-368] code clean up in TestAsyncCompaction class (#1050) --- .../java/org/apache/hudi/TestAsyncCompaction.java | 89 +++++++++------------- 1 file changed, 34 insertions(+), 55 deletions(-) diff --git a/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java index 451a2b1..0643708 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java @@ -35,7 +35,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.common.util.AvroUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -108,17 +107,15 @@ public class TestAsyncCompaction extends TestHoodieClientBase { HoodieInstant pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); - assertTrue("Pending Compaction instant has expected instant time", - pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); - assertTrue("Pending Compaction instant has expected state", - pendingCompactionInstant.getState().equals(State.REQUESTED)); + assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(), + compactionInstantTime); + assertEquals("Pending Compaction instant has expected state", pendingCompactionInstant.getState(), State.REQUESTED); - moveCompactionFromRequestedToInflight(compactionInstantTime, client, cfg); + moveCompactionFromRequestedToInflight(compactionInstantTime, cfg); // Reload and rollback inflight compaction metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); - hoodieTable.rollback(jsc, compactionInstantTime, false); client.rollbackInflightCompaction( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable); @@ -139,11 +136,6 @@ public class TestAsyncCompaction extends TestHoodieClientBase { } } - private Path getInstantPath(HoodieTableMetaClient metaClient, String timestamp, String action, State state) { - HoodieInstant instant = new HoodieInstant(state, action, timestamp); - return new Path(metaClient.getMetaPath(), instant.getFileName()); - } - @Test public void testRollbackInflightIngestionWithPendingCompaction() throws Exception { // Rollback inflight ingestion when there is pending compaction @@ -171,12 +163,11 @@ public class TestAsyncCompaction extends TestHoodieClientBase { metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieInstant pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); - assertTrue("Pending Compaction instant has expected instant time", - pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); + assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(), + compactionInstantTime); HoodieInstant inflightInstant = metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get(); - assertTrue("inflight instant has expected instant time", - inflightInstant.getTimestamp().equals(inflightInstantTime)); + assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), inflightInstantTime); // This should rollback client.startCommitWithTime(nextInflightInstantTime); @@ -184,14 +175,13 @@ public class TestAsyncCompaction extends TestHoodieClientBase { // Validate metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); inflightInstant = metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get(); - assertTrue("inflight instant has expected instant time", - inflightInstant.getTimestamp().equals(nextInflightInstantTime)); - assertTrue("Expect only one inflight instant", - metaClient.getActiveTimeline().filterInflightsExcludingCompaction().getInstants().count() == 1); + assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), nextInflightInstantTime); + assertEquals("Expect only one inflight instant", 1, metaClient.getActiveTimeline() + .filterInflightsExcludingCompaction().getInstants().count()); // Expect pending Compaction to be present pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); - assertTrue("Pending Compaction instant has expected instant time", - pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); + assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(), + compactionInstantTime); } } @@ -217,7 +207,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); scheduleCompaction(compactionInstantTime, client, cfg); - moveCompactionFromRequestedToInflight(compactionInstantTime, client, cfg); + moveCompactionFromRequestedToInflight(compactionInstantTime, cfg); // Complete ingestions runNextDeltaCommits(client, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false, @@ -245,13 +235,11 @@ public class TestAsyncCompaction extends TestHoodieClientBase { new ArrayList<>()); // Schedule compaction but do not run them - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); scheduleCompaction(compactionInstantTime, client, cfg); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieInstant pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); - assertTrue("Pending Compaction instant has expected instant time", - pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); + assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(), compactionInstantTime); boolean gotException = false; try { @@ -287,8 +275,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieInstant inflightInstant = metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get(); - assertTrue("inflight instant has expected instant time", - inflightInstant.getTimestamp().equals(inflightInstantTime)); + assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), inflightInstantTime); boolean gotException = false; try { @@ -314,10 +301,9 @@ public class TestAsyncCompaction extends TestHoodieClientBase { int numRecs = 2000; List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs); - records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, + runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); boolean gotException = false; try { // Schedule compaction but do not run them @@ -329,10 +315,9 @@ public class TestAsyncCompaction extends TestHoodieClientBase { // Schedule with timestamp same as that of committed instant gotException = false; - String dupCompactionInstantTime = secondInstantTime; try { // Schedule compaction but do not run them - scheduleCompaction(dupCompactionInstantTime, client, cfg); + scheduleCompaction(secondInstantTime, client, cfg); } catch (IllegalArgumentException iex) { gotException = true; } @@ -343,7 +328,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { gotException = false; try { // Schedule compaction with the same times as a pending compaction - scheduleCompaction(dupCompactionInstantTime, client, cfg); + scheduleCompaction(secondInstantTime, client, cfg); } catch (IllegalArgumentException iex) { gotException = true; } @@ -354,7 +339,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { public void testCompactionAfterTwoDeltaCommits() throws Exception { // No Delta Commits after compaction request HoodieWriteConfig cfg = getConfig(true); - try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) { + try (HoodieWriteClient client = getHoodieWriteClient(cfg, true)) { String firstInstantTime = "001"; String secondInstantTime = "004"; @@ -362,7 +347,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { int numRecs = 2000; List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs); - records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, + runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); @@ -405,15 +390,14 @@ public class TestAsyncCompaction extends TestHoodieClientBase { private void validateDeltaCommit(String latestDeltaCommit, final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation, - HoodieWriteConfig cfg) throws IOException { + HoodieWriteConfig cfg) { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTable table = getHoodieTable(metaClient, cfg); - List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table, cfg); + List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table); fileSliceList.forEach(fileSlice -> { Pair<String, HoodieCompactionOperation> opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId()); if (opPair != null) { - assertTrue("Expect baseInstant to match compaction Instant", - fileSlice.getBaseInstantTime().equals(opPair.getKey())); + assertEquals("Expect baseInstant to match compaction Instant", fileSlice.getBaseInstantTime(), opPair.getKey()); assertTrue("Expect atleast one log file to be present where the latest delta commit was written", fileSlice.getLogFiles().count() > 0); assertFalse("Expect no data-file to be present", fileSlice.getDataFile().isPresent()); @@ -469,12 +453,9 @@ public class TestAsyncCompaction extends TestHoodieClientBase { return records; } - private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteClient client, - HoodieWriteConfig cfg) throws IOException { + private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); - HoodieCompactionPlan workload = AvroUtils - .deserializeCompactionPlan(metaClient.getActiveTimeline().getInstantAuxiliaryDetails(compactionInstant).get()); metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant); HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants() .filter(in -> in.getTimestamp().equals(compactionInstantTime)).findAny().get(); @@ -499,19 +480,19 @@ public class TestAsyncCompaction extends TestHoodieClientBase { HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException { client.compact(compactionInstantTime); - List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table, cfg); + List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table); assertTrue("Ensure latest file-slices are not empty", fileSliceList.stream().findAny().isPresent()); assertFalse("Verify all file-slices have base-instant same as compaction instant", fileSliceList.stream() - .filter(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)).findAny().isPresent()); + .anyMatch(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime))); assertFalse("Verify all file-slices have data-files", - fileSliceList.stream().filter(fs -> !fs.getDataFile().isPresent()).findAny().isPresent()); + fileSliceList.stream().anyMatch(fs -> !fs.getDataFile().isPresent())); if (hasDeltaCommitAfterPendingCompaction) { assertFalse("Verify all file-slices have atleast one log-file", - fileSliceList.stream().filter(fs -> fs.getLogFiles().count() == 0).findAny().isPresent()); + fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() == 0)); } else { assertFalse("Verify all file-slices have no log-files", - fileSliceList.stream().filter(fs -> fs.getLogFiles().count() > 0).findAny().isPresent()); + fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() > 0)); } // verify that there is a commit @@ -554,16 +535,14 @@ public class TestAsyncCompaction extends TestHoodieClientBase { FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath()); HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles); - List<HoodieDataFile> dataFilesToRead = view.getLatestDataFiles().collect(Collectors.toList()); - return dataFilesToRead; + return view.getLatestDataFiles().collect(Collectors.toList()); } - private List<FileSlice> getCurrentLatestFileSlices(HoodieTable table, HoodieWriteConfig cfg) throws IOException { + private List<FileSlice> getCurrentLatestFileSlices(HoodieTable table) { HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(), table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline()); - List<FileSlice> fileSliceList = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).stream() - .flatMap(partition -> view.getLatestFileSlices(partition)).collect(Collectors.toList()); - return fileSliceList; + return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) + .flatMap(view::getLatestFileSlices).collect(Collectors.toList()); } protected HoodieTableType getTableType() {