n3nash commented on a change in pull request #2349:
URL: https://github.com/apache/hudi/pull/2349#discussion_r549525014



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
##########
@@ -307,6 +308,125 @@ public void testSimpleTagLocationAndUpdateWithRollback() 
throws Exception {
     assertEquals(0, records3.stream().filter(record -> 
record.getCurrentLocation() != null).count());
   }
 
+  /*
+   * Test case to verify that for taglocation entries present in HBase, if the 
corresponding commit instant is missing
+   * in timeline and the commit is not archived, taglocation would reset the 
current record location to null.
+   */
+  @Test
+  public void testSimpleTagLocationWithInvalidCommit() throws Exception {
+    // Load to memory
+    HoodieWriteConfig config = getConfig();
+    SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
+    SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
+
+    String newCommitTime = writeClient.startCommit();
+    // make a commit with 199 records
+    JavaRDD<HoodieRecord> writeRecords = generateAndCommitRecords(writeClient, 
199);
+
+    // make a second commit with a single record
+    String invalidCommit = writeClient.startCommit();
+    JavaRDD<HoodieRecord> invalidWriteRecords = 
generateAndCommitRecords(writeClient, 1, invalidCommit);
+
+    // verify location is tagged.
+    HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
+    JavaRDD<HoodieRecord> javaRDD0 = index.tagLocation(invalidWriteRecords, 
context(), hoodieTable);
+    assert (javaRDD0.collect().size() == 1);   // one record present
+    assert 
(javaRDD0.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 1); 
// it is tagged
+    assert 
(javaRDD0.collect().get(0).getCurrentLocation().getInstantTime().equals(invalidCommit));
+
+    // rollback the invalid commit, so that hbase will be left with a stale 
entry.
+    writeClient.rollback(invalidCommit);
+
+    // Now tagLocation for the valid records, hbaseIndex should tag them
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+    JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords, 
context(), hoodieTable);
+    assert 
(javaRDD1.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 199);
+
+    // tagLocation for the invalid record - commit is not present in timeline 
due to rollback.
+    JavaRDD<HoodieRecord> javaRDD2 = index.tagLocation(invalidWriteRecords, 
context(), hoodieTable);
+    assert (javaRDD2.collect().size() == 1);   // one record present
+    assert 
(javaRDD2.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 0); 
// it is not tagged
+  }
+
+  /*
+   * Test case to verify that taglocation() uses the commit timeline to 
validate the commitTS stored in hbase.
+   * When CheckIfValidCommit() in HbaseIndex uses the incorrect timeline 
filtering, this test would fail.
+   */
+  @Test
+  public void testEnsureTagLocationUsesCommitTimeline() throws Exception {
+    // Load to memory
+    HoodieWriteConfig config = getConfig();
+    SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
+    SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
+
+    String commitTime1 = writeClient.startCommit();
+    JavaRDD<HoodieRecord> writeRecords1 = 
generateAndCommitRecords(writeClient, 20, commitTime1);
+
+    // rollback the commit - leaves a clean file in timeline.
+    writeClient.rollback(commitTime1);
+
+    // create a second commit with 20 records
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    generateAndCommitRecords(writeClient, 20);
+
+    // Now tagLocation for the first set of rolledback records, hbaseIndex 
should tag them
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
+    JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords1, 
context(), hoodieTable);
+    assert 
(javaRDD1.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 20);

Review comment:
       @nsivabalan ^




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to