This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 11fd62b  Fix S3PinotFS List API may not return full results (#6002)
11fd62b is described below

commit 11fd62b77d84ba714828d0f85341e205f83c6c4e
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Fri Sep 11 02:37:45 2020 -0700

    Fix S3PinotFS List API may not return full results (#6002)
---
 .../spark/SparkSegmentGenerationJobRunner.java     |  1 +
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  | 61 +++++++++++++---------
 2 files changed, 36 insertions(+), 26 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index ad96e5d..c1b3f25 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -188,6 +188,7 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
       }
     }
 
+    LOGGER.info("Found {} files to create Pinot segments!", 
filteredFiles.size());
     try {
       JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
 
diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index d70eadc..08e74c9 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
@@ -374,34 +375,42 @@ public class S3PinotFS extends PinotFS {
       throws IOException {
     try {
       ImmutableList.Builder<String> builder = ImmutableList.builder();
+      String continuationToken = null;
+      boolean isDone = false;
       String prefix = normalizeToDirectoryPrefix(fileUri);
-
-      ListObjectsV2Response listObjectsV2Response;
-      ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
-          ListObjectsV2Request.builder().bucket(fileUri.getHost());
-
-      if (!prefix.equals(DELIMITER)) {
-        listObjectsV2RequestBuilder = 
listObjectsV2RequestBuilder.prefix(prefix);
-      }
-
-      if (!recursive) {
-        listObjectsV2RequestBuilder = 
listObjectsV2RequestBuilder.delimiter(DELIMITER);
-      }
-
-      ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.build();
-      listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request);
-
-      listObjectsV2Response.contents().stream().forEach(object -> {
-        //Only add files and not directories
-        if (!object.key().equals(fileUri.getPath()) && 
!object.key().endsWith(DELIMITER)) {
-          String fileKey = object.key();
-          if (fileKey.startsWith(DELIMITER)) {
-            fileKey = fileKey.substring(1);
-          }
-          builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey);
+      while(!isDone) {
+        ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
+            ListObjectsV2Request.builder().bucket(fileUri.getHost());
+        if (!prefix.equals(DELIMITER)) {
+          listObjectsV2RequestBuilder = 
listObjectsV2RequestBuilder.prefix(prefix);
+        }
+        if (!recursive) {
+          listObjectsV2RequestBuilder = 
listObjectsV2RequestBuilder.delimiter(DELIMITER);
         }
-      });
-      return builder.build().toArray(new String[0]);
+        if (continuationToken != null) {
+          listObjectsV2RequestBuilder.continuationToken(continuationToken);
+        }
+        ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.build();
+        LOGGER.debug("Trying to send ListObjectsV2Request {}", 
listObjectsV2Request);
+        ListObjectsV2Response listObjectsV2Response = 
_s3Client.listObjectsV2(listObjectsV2Request);
+        LOGGER.debug("Getting ListObjectsV2Response: {}", 
listObjectsV2Response);
+        List<S3Object> filesReturned = listObjectsV2Response.contents();
+        filesReturned.stream().forEach(object -> {
+          //Only add files and not directories
+          if (!object.key().equals(fileUri.getPath()) && 
!object.key().endsWith(DELIMITER)) {
+            String fileKey = object.key();
+            if (fileKey.startsWith(DELIMITER)) {
+              fileKey = fileKey.substring(1);
+            }
+            builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey);
+          }
+        });
+        isDone = !listObjectsV2Response.isTruncated();
+        continuationToken = listObjectsV2Response.nextContinuationToken();
+      }
+      String[] listedFiles = builder.build().toArray(new String[0]);
+      LOGGER.info("Listed {} files from URI: {}, is recursive: {}", 
listedFiles.length, fileUri, recursive);
+      return listedFiles;
     } catch (Throwable t) {
       throw new IOException(t);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to