This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ca912a  [HUDI-667] Fixing delete tests for DeltaStreamer (#1395)
1ca912a is described below

commit 1ca912af0904283a270a822d5876babca5c89739
Author: Sivabalan Narayanan <sivab...@uber.com>
AuthorDate: Wed Mar 11 16:19:23 2020 -0700

    [HUDI-667] Fixing delete tests for DeltaStreamer (#1395)
---
 .../hudi/common/HoodieTestDataGenerator.java       | 42 +++++++++++-----------
 .../hudi/utilities/TestHoodieDeltaStreamer.java    |  4 +--
 2 files changed, 23 insertions(+), 23 deletions(-)

diff --git 
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java 
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index e0d2a53..6d86e93 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -399,7 +399,6 @@ public class HoodieTestDataGenerator {
    */
   public Stream<HoodieRecord> generateUniqueUpdatesStream(String commitTime, 
Integer n) {
     final Set<KeyPartition> used = new HashSet<>();
-
     if (n > numExistingKeys) {
       throw new IllegalArgumentException("Requested unique updates is greater 
than number of available keys");
     }
@@ -429,24 +428,24 @@ public class HoodieTestDataGenerator {
    */
   public Stream<HoodieKey> generateUniqueDeleteStream(Integer n) {
     final Set<KeyPartition> used = new HashSet<>();
-
     if (n > numExistingKeys) {
       throw new IllegalArgumentException("Requested unique deletes is greater 
than number of available keys");
     }
 
-    return IntStream.range(0, n).boxed().map(i -> {
-      int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1);
-      KeyPartition kp = existingKeys.get(index);
-      // Find the available keyPartition starting from randomly chosen one.
-      while (used.contains(kp)) {
+    List<HoodieKey> result = new ArrayList<>();
+    for (int i = 0; i < n; i++) {
+      int index = RAND.nextInt(numExistingKeys);
+      while (!existingKeys.containsKey(index)) {
         index = (index + 1) % numExistingKeys;
-        kp = existingKeys.get(index);
       }
-      existingKeys.remove(kp);
+      KeyPartition kp = existingKeys.remove(index);
+      existingKeys.put(index, existingKeys.get(numExistingKeys - 1));
+      existingKeys.remove(numExistingKeys - 1);
       numExistingKeys--;
       used.add(kp);
-      return kp.key;
-    });
+      result.add(kp.key);
+    }
+    return result.stream();
   }
 
   /**
@@ -458,28 +457,29 @@ public class HoodieTestDataGenerator {
    */
   public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String 
commitTime, Integer n) {
     final Set<KeyPartition> used = new HashSet<>();
-
     if (n > numExistingKeys) {
       throw new IllegalArgumentException("Requested unique deletes is greater 
than number of available keys");
     }
 
-    return IntStream.range(0, n).boxed().map(i -> {
-      int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1);
-      KeyPartition kp = existingKeys.get(index);
-      // Find the available keyPartition starting from randomly chosen one.
-      while (used.contains(kp)) {
+    List<HoodieRecord> result = new ArrayList<>();
+    for (int i = 0; i < n; i++) {
+      int index = RAND.nextInt(numExistingKeys);
+      while (!existingKeys.containsKey(index)) {
         index = (index + 1) % numExistingKeys;
-        kp = existingKeys.get(index);
       }
-      existingKeys.remove(kp);
+      // swap chosen index with last index and remove last entry. 
+      KeyPartition kp = existingKeys.remove(index);
+      existingKeys.put(index, existingKeys.get(numExistingKeys - 1));
+      existingKeys.remove(numExistingKeys - 1);
       numExistingKeys--;
       used.add(kp);
       try {
-        return new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, 
commitTime));
+        result.add(new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, 
commitTime)));
       } catch (IOException e) {
         throw new HoodieIOException(e.getMessage(), e);
       }
-    });
+    }
+    return result.stream();
   }
 
   public String[] getPartitionPaths() {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index 9d324dc..100faa2 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -422,8 +422,8 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
       } else {
         TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, dfs);
       }
-      TestHelpers.assertRecordCount(totalRecords + 200, tableBasePath + 
"/*/*.parquet", sqlContext);
-      TestHelpers.assertDistanceCount(totalRecords + 200, tableBasePath + 
"/*/*.parquet", sqlContext);
+      TestHelpers.assertRecordCount(totalRecords, tableBasePath + 
"/*/*.parquet", sqlContext);
+      TestHelpers.assertDistanceCount(totalRecords, tableBasePath + 
"/*/*.parquet", sqlContext);
       return true;
     }, 180);
     ds.shutdownGracefully();

Reply via email to