This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3790b75  [HUDI-368] code clean up in TestAsyncCompaction class (#1050)
3790b75 is described below

commit 3790b75e059a06e6f5467c8b8d549ef38cd6b98a
Author: Pratyaksh Sharma <pratyaks...@gmail.com>
AuthorDate: Wed Dec 11 03:22:41 2019 +0530

    [HUDI-368] code clean up in TestAsyncCompaction class (#1050)
---
 .../java/org/apache/hudi/TestAsyncCompaction.java  | 89 +++++++++-------------
 1 file changed, 34 insertions(+), 55 deletions(-)

diff --git a/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java 
b/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java
index 451a2b1..0643708 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java
@@ -35,7 +35,6 @@ import 
org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.util.AvroUtils;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -108,17 +107,15 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
 
       HoodieInstant pendingCompactionInstant =
           
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
-      assertTrue("Pending Compaction instant has expected instant time",
-          
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime));
-      assertTrue("Pending Compaction instant has expected state",
-          pendingCompactionInstant.getState().equals(State.REQUESTED));
+      assertEquals("Pending Compaction instant has expected instant time", 
pendingCompactionInstant.getTimestamp(),
+          compactionInstantTime);
+      assertEquals("Pending Compaction instant has expected state", 
pendingCompactionInstant.getState(), State.REQUESTED);
 
-      moveCompactionFromRequestedToInflight(compactionInstantTime, client, 
cfg);
+      moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
 
       // Reload and rollback inflight compaction
       metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
cfg.getBasePath());
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, 
jsc);
-      hoodieTable.rollback(jsc, compactionInstantTime, false);
 
       client.rollbackInflightCompaction(
           new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, 
compactionInstantTime), hoodieTable);
@@ -139,11 +136,6 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
     }
   }
 
-  private Path getInstantPath(HoodieTableMetaClient metaClient, String 
timestamp, String action, State state) {
-    HoodieInstant instant = new HoodieInstant(state, action, timestamp);
-    return new Path(metaClient.getMetaPath(), instant.getFileName());
-  }
-
   @Test
   public void testRollbackInflightIngestionWithPendingCompaction() throws 
Exception {
     // Rollback inflight ingestion when there is pending compaction
@@ -171,12 +163,11 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
       metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
cfg.getBasePath());
       HoodieInstant pendingCompactionInstant =
           
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
-      assertTrue("Pending Compaction instant has expected instant time",
-          
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime));
+      assertEquals("Pending Compaction instant has expected instant time", 
pendingCompactionInstant.getTimestamp(),
+          compactionInstantTime);
       HoodieInstant inflightInstant =
           
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
-      assertTrue("inflight instant has expected instant time",
-          inflightInstant.getTimestamp().equals(inflightInstantTime));
+      assertEquals("inflight instant has expected instant time", 
inflightInstant.getTimestamp(), inflightInstantTime);
 
       // This should rollback
       client.startCommitWithTime(nextInflightInstantTime);
@@ -184,14 +175,13 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
       // Validate
       metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
cfg.getBasePath());
       inflightInstant = 
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
-      assertTrue("inflight instant has expected instant time",
-          inflightInstant.getTimestamp().equals(nextInflightInstantTime));
-      assertTrue("Expect only one inflight instant",
-          
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().getInstants().count()
 == 1);
+      assertEquals("inflight instant has expected instant time", 
inflightInstant.getTimestamp(), nextInflightInstantTime);
+      assertEquals("Expect only one inflight instant", 1, 
metaClient.getActiveTimeline()
+          .filterInflightsExcludingCompaction().getInstants().count());
       // Expect pending Compaction to be present
       pendingCompactionInstant = 
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
-      assertTrue("Pending Compaction instant has expected instant time",
-          
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime));
+      assertEquals("Pending Compaction instant has expected instant time", 
pendingCompactionInstant.getTimestamp(),
+          compactionInstantTime);
     }
   }
 
@@ -217,7 +207,7 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
       HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
       HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
       scheduleCompaction(compactionInstantTime, client, cfg);
-      moveCompactionFromRequestedToInflight(compactionInstantTime, client, 
cfg);
+      moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
 
       // Complete ingestions
       runNextDeltaCommits(client, Arrays.asList(thirdInstantTime, 
fourthInstantTime), records, cfg, false,
@@ -245,13 +235,11 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
         new ArrayList<>());
 
     // Schedule compaction but do not run them
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
     scheduleCompaction(compactionInstantTime, client, cfg);
-    metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
cfg.getBasePath());
+    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
     HoodieInstant pendingCompactionInstant =
         
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
-    assertTrue("Pending Compaction instant has expected instant time",
-        pendingCompactionInstant.getTimestamp().equals(compactionInstantTime));
+    assertEquals("Pending Compaction instant has expected instant time", 
pendingCompactionInstant.getTimestamp(), compactionInstantTime);
 
     boolean gotException = false;
     try {
@@ -287,8 +275,7 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
     metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
cfg.getBasePath());
     HoodieInstant inflightInstant =
         
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
-    assertTrue("inflight instant has expected instant time",
-        inflightInstant.getTimestamp().equals(inflightInstantTime));
+    assertEquals("inflight instant has expected instant time", 
inflightInstant.getTimestamp(), inflightInstantTime);
 
     boolean gotException = false;
     try {
@@ -314,10 +301,9 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
     int numRecs = 2000;
 
     List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, 
numRecs);
-    records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, 
secondInstantTime), records, cfg, true,
+    runNextDeltaCommits(client, Arrays.asList(firstInstantTime, 
secondInstantTime), records, cfg, true,
         new ArrayList<>());
 
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
     boolean gotException = false;
     try {
       // Schedule compaction but do not run them
@@ -329,10 +315,9 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
 
     // Schedule with timestamp same as that of committed instant
     gotException = false;
-    String dupCompactionInstantTime = secondInstantTime;
     try {
       // Schedule compaction but do not run them
-      scheduleCompaction(dupCompactionInstantTime, client, cfg);
+      scheduleCompaction(secondInstantTime, client, cfg);
     } catch (IllegalArgumentException iex) {
       gotException = true;
     }
@@ -343,7 +328,7 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
     gotException = false;
     try {
       // Schedule compaction with the same times as a pending compaction
-      scheduleCompaction(dupCompactionInstantTime, client, cfg);
+      scheduleCompaction(secondInstantTime, client, cfg);
     } catch (IllegalArgumentException iex) {
       gotException = true;
     }
@@ -354,7 +339,7 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
   public void testCompactionAfterTwoDeltaCommits() throws Exception {
     // No Delta Commits after compaction request
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true)) {
 
       String firstInstantTime = "001";
       String secondInstantTime = "004";
@@ -362,7 +347,7 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
       int numRecs = 2000;
 
       List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, 
numRecs);
-      records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, 
secondInstantTime), records, cfg, true,
+      runNextDeltaCommits(client, Arrays.asList(firstInstantTime, 
secondInstantTime), records, cfg, true,
           new ArrayList<>());
 
       HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
@@ -405,15 +390,14 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
 
   private void validateDeltaCommit(String latestDeltaCommit,
       final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> 
fgIdToCompactionOperation,
-      HoodieWriteConfig cfg) throws IOException {
+      HoodieWriteConfig cfg) {
     HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
     HoodieTable table = getHoodieTable(metaClient, cfg);
-    List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table, cfg);
+    List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
     fileSliceList.forEach(fileSlice -> {
       Pair<String, HoodieCompactionOperation> opPair = 
fgIdToCompactionOperation.get(fileSlice.getFileGroupId());
       if (opPair != null) {
-        assertTrue("Expect baseInstant to match compaction Instant",
-            fileSlice.getBaseInstantTime().equals(opPair.getKey()));
+        assertEquals("Expect baseInstant to match compaction Instant", 
fileSlice.getBaseInstantTime(), opPair.getKey());
         assertTrue("Expect atleast one log file to be present where the latest 
delta commit was written",
             fileSlice.getLogFiles().count() > 0);
         assertFalse("Expect no data-file to be present", 
fileSlice.getDataFile().isPresent());
@@ -469,12 +453,9 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
     return records;
   }
 
-  private void moveCompactionFromRequestedToInflight(String 
compactionInstantTime, HoodieWriteClient client,
-      HoodieWriteConfig cfg) throws IOException {
+  private void moveCompactionFromRequestedToInflight(String 
compactionInstantTime, HoodieWriteConfig cfg) {
     HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
     HoodieInstant compactionInstant = 
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
-    HoodieCompactionPlan workload = AvroUtils
-        
.deserializeCompactionPlan(metaClient.getActiveTimeline().getInstantAuxiliaryDetails(compactionInstant).get());
     
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
     HoodieInstant instant = 
metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
         .filter(in -> 
in.getTimestamp().equals(compactionInstantTime)).findAny().get();
@@ -499,19 +480,19 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
       HoodieWriteConfig cfg, int expectedNumRecs, boolean 
hasDeltaCommitAfterPendingCompaction) throws IOException {
 
     client.compact(compactionInstantTime);
-    List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table, cfg);
+    List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
     assertTrue("Ensure latest file-slices are not empty", 
fileSliceList.stream().findAny().isPresent());
     assertFalse("Verify all file-slices have base-instant same as compaction 
instant", fileSliceList.stream()
-        .filter(fs -> 
!fs.getBaseInstantTime().equals(compactionInstantTime)).findAny().isPresent());
+        .anyMatch(fs -> 
!fs.getBaseInstantTime().equals(compactionInstantTime)));
     assertFalse("Verify all file-slices have data-files",
-        fileSliceList.stream().filter(fs -> 
!fs.getDataFile().isPresent()).findAny().isPresent());
+        fileSliceList.stream().anyMatch(fs -> !fs.getDataFile().isPresent()));
 
     if (hasDeltaCommitAfterPendingCompaction) {
       assertFalse("Verify all file-slices have atleast one log-file",
-          fileSliceList.stream().filter(fs -> fs.getLogFiles().count() == 
0).findAny().isPresent());
+          fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() == 
0));
     } else {
       assertFalse("Verify all file-slices have no log-files",
-          fileSliceList.stream().filter(fs -> fs.getLogFiles().count() > 
0).findAny().isPresent());
+          fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() > 0));
     }
 
     // verify that there is a commit
@@ -554,16 +535,14 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
     FileStatus[] allFiles = 
HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), 
cfg.getBasePath());
     HoodieTableFileSystemView view =
         new HoodieTableFileSystemView(table.getMetaClient(), 
table.getCompletedCommitsTimeline(), allFiles);
-    List<HoodieDataFile> dataFilesToRead = 
view.getLatestDataFiles().collect(Collectors.toList());
-    return dataFilesToRead;
+    return view.getLatestDataFiles().collect(Collectors.toList());
   }
 
-  private List<FileSlice> getCurrentLatestFileSlices(HoodieTable table, 
HoodieWriteConfig cfg) throws IOException {
+  private List<FileSlice> getCurrentLatestFileSlices(HoodieTable table) {
     HoodieTableFileSystemView view = new 
HoodieTableFileSystemView(table.getMetaClient(),
         
table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline());
-    List<FileSlice> fileSliceList = 
Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).stream()
-        .flatMap(partition -> 
view.getLatestFileSlices(partition)).collect(Collectors.toList());
-    return fileSliceList;
+    return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS)
+        .flatMap(view::getLatestFileSlices).collect(Collectors.toList());
   }
 
   protected HoodieTableType getTableType() {

Reply via email to