This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new afa6bc0  [HUDI-1723] Fix path selector listing files with the same mod 
date (#2845)
afa6bc0 is described below

commit afa6bc0b100450b5d80a27bf5b87cbd4a0fbb3a5
Author: Raymond Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Tue May 25 07:19:10 2021 -0700

    [HUDI-1723] Fix path selector listing files with the same mod date (#2845)
---
 .../hudi/common/testutils/FileCreateUtils.java     |   8 +
 .../utilities/sources/helpers/DFSPathSelector.java |  12 +-
 .../sources/helpers/DatePartitionPathSelector.java |  13 +-
 .../helpers/TestDFSPathSelectorCommonMethods.java  | 161 +++++++++++++++++++++
 4 files changed, 184 insertions(+), 10 deletions(-)

diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 35f2cfe..b7754b0 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -45,6 +45,8 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.nio.file.attribute.FileTime;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -224,6 +226,11 @@ public class FileCreateUtils {
 
   public static void createBaseFile(String basePath, String partitionPath, 
String instantTime, String fileId, long length)
       throws Exception {
+    createBaseFile(basePath, partitionPath, instantTime, fileId, length, 
Instant.now().toEpochMilli());
+  }
+
+  public static void createBaseFile(String basePath, String partitionPath, 
String instantTime, String fileId, long length, long lastModificationTimeMilli)
+      throws Exception {
     Path parentPath = Paths.get(basePath, partitionPath);
     Files.createDirectories(parentPath);
     Path baseFilePath = parentPath.resolve(baseFileName(instantTime, fileId));
@@ -231,6 +238,7 @@ public class FileCreateUtils {
       Files.createFile(baseFilePath);
     }
     new RandomAccessFile(baseFilePath.toFile(), "rw").setLength(length);
+    Files.setLastModifiedTime(baseFilePath, 
FileTime.fromMillis(lastModificationTimeMilli));
   }
 
   public static void createLogFile(String basePath, String partitionPath, 
String instantTime, String fileId, int version)
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index d9d3444..9301bf3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -121,28 +121,30 @@ public class DFSPathSelector implements Serializable {
       
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
       // Filter based on checkpoint & input size, if needed
       long currentBytes = 0;
-      long maxModificationTime = Long.MIN_VALUE;
+      long newCheckpointTime = lastCheckpointTime;
       List<FileStatus> filteredFiles = new ArrayList<>();
       for (FileStatus f : eligibleFiles) {
-        if (currentBytes + f.getLen() >= sourceLimit) {
+        if (currentBytes + f.getLen() >= sourceLimit && 
f.getModificationTime() > newCheckpointTime) {
           // we have enough data, we are done
+          // Also, we've read up to a file with a newer modification time
+          // so that some files with the same modification time won't be 
skipped in next read
           break;
         }
 
-        maxModificationTime = f.getModificationTime();
+        newCheckpointTime = f.getModificationTime();
         currentBytes += f.getLen();
         filteredFiles.add(f);
       }
 
       // no data to read
       if (filteredFiles.isEmpty()) {
-        return new ImmutablePair<>(Option.empty(), 
lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
+        return new ImmutablePair<>(Option.empty(), 
String.valueOf(newCheckpointTime));
       }
 
       // read the files out.
       String pathStr = filteredFiles.stream().map(f -> 
f.getPath().toString()).collect(Collectors.joining(","));
 
-      return new ImmutablePair<>(Option.ofNullable(pathStr), 
String.valueOf(maxModificationTime));
+      return new ImmutablePair<>(Option.ofNullable(pathStr), 
String.valueOf(newCheckpointTime));
     } catch (IOException ioe) {
       throw new HoodieIOException("Unable to read from source from checkpoint: 
" + lastCheckpointStr, ioe);
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
index 97106de..71e6a57 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
@@ -144,27 +144,30 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
 
     // Filter based on checkpoint & input size, if needed
     long currentBytes = 0;
+    long newCheckpointTime = lastCheckpointTime;
     List<FileStatus> filteredFiles = new ArrayList<>();
     for (FileStatus f : sortedEligibleFiles) {
-      if (currentBytes + f.getLen() >= sourceLimit) {
+      if (currentBytes + f.getLen() >= sourceLimit && f.getModificationTime() 
> newCheckpointTime) {
         // we have enough data, we are done
+        // Also, we've read up to a file with a newer modification time
+        // so that some files with the same modification time won't be skipped 
in next read
         break;
       }
 
+      newCheckpointTime = f.getModificationTime();
       currentBytes += f.getLen();
       filteredFiles.add(f);
     }
 
     // no data to read
     if (filteredFiles.isEmpty()) {
-      return new ImmutablePair<>(
-          Option.empty(), lastCheckpointStr.orElseGet(() -> 
String.valueOf(Long.MIN_VALUE)));
+      return new ImmutablePair<>(Option.empty(), 
String.valueOf(newCheckpointTime));
     }
 
     // read the files out.
     String pathStr = filteredFiles.stream().map(f -> 
f.getPath().toString()).collect(Collectors.joining(","));
-    long maxModificationTime = filteredFiles.get(filteredFiles.size() - 
1).getModificationTime();
-    return new ImmutablePair<>(Option.ofNullable(pathStr), 
String.valueOf(maxModificationTime));
+
+    return new ImmutablePair<>(Option.ofNullable(pathStr), 
String.valueOf(newCheckpointTime));
   }
 
   /**
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
new file mode 100644
index 0000000..08acc87
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.testutils.FileCreateUtils.createBaseFile;
+import static 
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestDFSPathSelectorCommonMethods extends HoodieClientTestHarness {
+
+  TypedProperties props;
+  Path inputPath;
+
+  @BeforeEach
+  void setUp() {
+    initSparkContexts();
+    initPath();
+    initFileSystem();
+    props = new TypedProperties();
+    props.setProperty(ROOT_INPUT_PATH_PROP, basePath);
+    props.setProperty(PARTITIONS_LIST_PARALLELISM, "1");
+    inputPath = new Path(basePath);
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    cleanupResources();
+  }
+
+  @ParameterizedTest
+  @ValueSource(classes = {DFSPathSelector.class, 
DatePartitionPathSelector.class})
+  public void listEligibleFilesShouldIgnoreCertainPrefixes(Class<?> clazz) 
throws Exception {
+    DFSPathSelector selector = (DFSPathSelector) 
ReflectionUtils.loadClass(clazz.getName(), props, hadoopConf);
+    createBaseFile(basePath, "p1", "000", "foo1", 1);
+    createBaseFile(basePath, "p1", "000", ".foo2", 1);
+    createBaseFile(basePath, "p1", "000", "_foo3", 1);
+
+    List<FileStatus> eligibleFiles = selector.listEligibleFiles(fs, inputPath, 
0);
+    assertEquals(1, eligibleFiles.size());
+    assertTrue(eligibleFiles.get(0).getPath().getName().startsWith("foo1"));
+  }
+
+  @ParameterizedTest
+  @ValueSource(classes = {DFSPathSelector.class, 
DatePartitionPathSelector.class})
+  public void listEligibleFilesShouldIgnore0LengthFiles(Class<?> clazz) throws 
Exception {
+    DFSPathSelector selector = (DFSPathSelector) 
ReflectionUtils.loadClass(clazz.getName(), props, hadoopConf);
+    createBaseFile(basePath, "p1", "000", "foo1", 1);
+    createBaseFile(basePath, "p1", "000", "foo2", 0);
+    createBaseFile(basePath, "p1", "000", "foo3", 0);
+
+    List<FileStatus> eligibleFiles = selector.listEligibleFiles(fs, inputPath, 
0);
+    assertEquals(1, eligibleFiles.size());
+    assertTrue(eligibleFiles.get(0).getPath().getName().startsWith("foo1"));
+  }
+
+  @ParameterizedTest
+  @ValueSource(classes = {DFSPathSelector.class, 
DatePartitionPathSelector.class})
+  public void 
listEligibleFilesShouldIgnoreFilesEarlierThanCheckpointTime(Class<?> clazz) 
throws Exception {
+    DFSPathSelector selector = (DFSPathSelector) 
ReflectionUtils.loadClass(clazz.getName(), props, hadoopConf);
+    createBaseFile(basePath, "p1", "000", "foo1", 1);
+    createBaseFile(basePath, "p1", "000", "foo2", 1);
+    createBaseFile(basePath, "p1", "000", "foo3", 1);
+
+    List<FileStatus> eligibleFiles = selector.listEligibleFiles(fs, inputPath, 
Long.MAX_VALUE);
+    assertEquals(0, eligibleFiles.size());
+  }
+
+  @ParameterizedTest
+  @ValueSource(classes = {DFSPathSelector.class, 
DatePartitionPathSelector.class})
+  public void 
getNextFilePathsAndMaxModificationTimeShouldRespectSourceLimit(Class<?> clazz) 
throws Exception {
+    DFSPathSelector selector = (DFSPathSelector) 
ReflectionUtils.loadClass(clazz.getName(), props, hadoopConf);
+    createBaseFile(basePath, "p1", "000", "foo1", 10, 1000);
+    createBaseFile(basePath, "p1", "000", "foo2", 10, 2000);
+    createBaseFile(basePath, "p1", "000", "foo3", 10, 3000);
+    createBaseFile(basePath, "p1", "000", "foo4", 10, 4000);
+    createBaseFile(basePath, "p1", "000", "foo5", 10, 5000);
+    Pair<Option<String>, String> nextFilePathsAndCheckpoint = selector
+        .getNextFilePathsAndMaxModificationTime(jsc, Option.empty(), 30);
+    List<String> fileNames = Arrays
+        .stream(nextFilePathsAndCheckpoint.getLeft().get().split(","))
+        .map(p -> Paths.get(p).toFile().getName())
+        .sorted().collect(Collectors.toList());
+    assertEquals(2, fileNames.size());
+    assertTrue(fileNames.get(0).startsWith("foo1"));
+    assertTrue(fileNames.get(1).startsWith("foo2"));
+    String checkpointStr1stRead = nextFilePathsAndCheckpoint.getRight();
+    assertEquals(2000L, Long.parseLong(checkpointStr1stRead), "should read up 
to foo2 (inclusive)");
+  }
+
+  @ParameterizedTest
+  @ValueSource(classes = {DFSPathSelector.class, 
DatePartitionPathSelector.class})
+  public void 
getNextFilePathsAndMaxModificationTimeShouldIgnoreSourceLimitIfSameModTimeFilesPresent(Class<?>
 clazz) throws Exception {
+    DFSPathSelector selector = (DFSPathSelector) 
ReflectionUtils.loadClass(clazz.getName(), props, hadoopConf);
+    createBaseFile(basePath, "p1", "000", "foo1", 10, 1000);
+    createBaseFile(basePath, "p1", "000", "foo2", 10, 1000);
+    createBaseFile(basePath, "p1", "000", "foo3", 10, 1000);
+    createBaseFile(basePath, "p1", "000", "foo4", 10, 2000);
+    createBaseFile(basePath, "p1", "000", "foo5", 10, 2000);
+    Pair<Option<String>, String> nextFilePathsAndCheckpoint = selector
+        .getNextFilePathsAndMaxModificationTime(jsc, Option.empty(), 20);
+    List<String> fileNames1stRead = Arrays
+        .stream(nextFilePathsAndCheckpoint.getLeft().get().split(","))
+        .map(p -> Paths.get(p).toFile().getName())
+        .sorted().collect(Collectors.toList());
+    assertEquals(3, fileNames1stRead.size());
+    assertTrue(fileNames1stRead.get(0).startsWith("foo1"));
+    assertTrue(fileNames1stRead.get(1).startsWith("foo2"));
+    assertTrue(fileNames1stRead.get(2).startsWith("foo3"));
+    String checkpointStr1stRead = nextFilePathsAndCheckpoint.getRight();
+    assertEquals(1000L, Long.parseLong(checkpointStr1stRead), "should read up 
to foo3 (inclusive)");
+
+    nextFilePathsAndCheckpoint = selector
+        .getNextFilePathsAndMaxModificationTime(jsc, 
Option.of(checkpointStr1stRead), 20);
+    List<String> fileNames2ndRead = Arrays
+        .stream(nextFilePathsAndCheckpoint.getLeft().get().split(","))
+        .map(p -> Paths.get(p).toFile().getName())
+        .sorted().collect(Collectors.toList());
+    assertEquals(2, fileNames2ndRead.size());
+    assertTrue(fileNames2ndRead.get(0).startsWith("foo4"));
+    assertTrue(fileNames2ndRead.get(1).startsWith("foo5"));
+    String checkpointStr2ndRead = nextFilePathsAndCheckpoint.getRight();
+    assertEquals(2000L, Long.parseLong(checkpointStr2ndRead), "should read up 
to foo5 (inclusive)");
+  }
+}

Reply via email to