This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch fixing_s3_list_api
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 86659186e8e2196b199a5c2bb011dd560a5a524f
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Thu Sep 10 23:38:37 2020 -0700

    Fix S3PinotFS List API may not return full results
---
 .../spark/SparkSegmentGenerationJobRunner.java     |  1 +
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  | 57 ++++++++++++----------
 2 files changed, 33 insertions(+), 25 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index ad96e5d..c1b3f25 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -188,6 +188,7 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
       }
     }
 
+    LOGGER.info("Found {} files to create Pinot segments!", 
filteredFiles.size());
     try {
       JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
 
diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index d70eadc..9d03c47 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
@@ -374,33 +375,39 @@ public class S3PinotFS extends PinotFS {
       throws IOException {
     try {
       ImmutableList.Builder<String> builder = ImmutableList.builder();
+      String continuationToken = null;
+      boolean isDone = false;
       String prefix = normalizeToDirectoryPrefix(fileUri);
-
-      ListObjectsV2Response listObjectsV2Response;
-      ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
-          ListObjectsV2Request.builder().bucket(fileUri.getHost());
-
-      if (!prefix.equals(DELIMITER)) {
-        listObjectsV2RequestBuilder = 
listObjectsV2RequestBuilder.prefix(prefix);
-      }
-
-      if (!recursive) {
-        listObjectsV2RequestBuilder = 
listObjectsV2RequestBuilder.delimiter(DELIMITER);
-      }
-
-      ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.build();
-      listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request);
-
-      listObjectsV2Response.contents().stream().forEach(object -> {
-        //Only add files and not directories
-        if (!object.key().equals(fileUri.getPath()) && 
!object.key().endsWith(DELIMITER)) {
-          String fileKey = object.key();
-          if (fileKey.startsWith(DELIMITER)) {
-            fileKey = fileKey.substring(1);
-          }
-          builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey);
+      while(!isDone) {
+        ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
+            ListObjectsV2Request.builder().bucket(fileUri.getHost());
+        if (!prefix.equals(DELIMITER)) {
+          listObjectsV2RequestBuilder = 
listObjectsV2RequestBuilder.prefix(prefix);
+        }
+        if (!recursive) {
+          listObjectsV2RequestBuilder = 
listObjectsV2RequestBuilder.delimiter(DELIMITER);
         }
-      });
+        if (continuationToken != null) {
+          listObjectsV2RequestBuilder.continuationToken(continuationToken);
+        }
+        ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.build();
+        LOGGER.debug("Trying to send ListObjectsV2Request {}", 
listObjectsV2Request);
+        ListObjectsV2Response listObjectsV2Response = 
_s3Client.listObjectsV2(listObjectsV2Request);
+        LOGGER.debug("Getting ListObjectsV2Response: {}", 
listObjectsV2Response);
+        List<S3Object> filesReturned = listObjectsV2Response.contents();
+        filesReturned.stream().forEach(object -> {
+          //Only add files and not directories
+          if (!object.key().equals(fileUri.getPath()) && 
!object.key().endsWith(DELIMITER)) {
+            String fileKey = object.key();
+            if (fileKey.startsWith(DELIMITER)) {
+              fileKey = fileKey.substring(1);
+            }
+            builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey);
+          }
+        });
+        isDone = !listObjectsV2Response.isTruncated();
+        continuationToken = listObjectsV2Response.nextContinuationToken();
+      }
       return builder.build().toArray(new String[0]);
     } catch (Throwable t) {
       throw new IOException(t);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to