This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 11fd62b Fix S3PinotFS List API may not return full results (#6002) 11fd62b is described below commit 11fd62b77d84ba714828d0f85341e205f83c6c4e Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Fri Sep 11 02:37:45 2020 -0700 Fix S3PinotFS List API may not return full results (#6002) --- .../spark/SparkSegmentGenerationJobRunner.java | 1 + .../apache/pinot/plugin/filesystem/S3PinotFS.java | 61 +++++++++++++--------- 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index ad96e5d..c1b3f25 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -188,6 +188,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri } } + LOGGER.info("Found {} files to create Pinot segments!", filteredFiles.size()); try { JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate()); diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java index d70eadc..08e74c9 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java @@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; @@ -374,34 +375,42 @@ public class S3PinotFS extends PinotFS { throws IOException { try { ImmutableList.Builder<String> builder = ImmutableList.builder(); + String continuationToken = null; + boolean isDone = false; String prefix = normalizeToDirectoryPrefix(fileUri); - - ListObjectsV2Response listObjectsV2Response; - ListObjectsV2Request.Builder listObjectsV2RequestBuilder = - ListObjectsV2Request.builder().bucket(fileUri.getHost()); - - if (!prefix.equals(DELIMITER)) { - listObjectsV2RequestBuilder = listObjectsV2RequestBuilder.prefix(prefix); - } - - if (!recursive) { - listObjectsV2RequestBuilder = listObjectsV2RequestBuilder.delimiter(DELIMITER); - } - - ListObjectsV2Request listObjectsV2Request = listObjectsV2RequestBuilder.build(); - listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request); - - listObjectsV2Response.contents().stream().forEach(object -> { - //Only add files and not directories - if (!object.key().equals(fileUri.getPath()) && !object.key().endsWith(DELIMITER)) { - String fileKey = object.key(); - if (fileKey.startsWith(DELIMITER)) { - fileKey = fileKey.substring(1); - } - builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey); + while(!isDone) { + ListObjectsV2Request.Builder listObjectsV2RequestBuilder = + ListObjectsV2Request.builder().bucket(fileUri.getHost()); + if (!prefix.equals(DELIMITER)) { + listObjectsV2RequestBuilder = listObjectsV2RequestBuilder.prefix(prefix); + } + if (!recursive) { + listObjectsV2RequestBuilder = listObjectsV2RequestBuilder.delimiter(DELIMITER); } - }); - return builder.build().toArray(new String[0]); + if (continuationToken != null) { + listObjectsV2RequestBuilder.continuationToken(continuationToken); + } + ListObjectsV2Request listObjectsV2Request = listObjectsV2RequestBuilder.build(); + LOGGER.debug("Trying to send ListObjectsV2Request {}", listObjectsV2Request); + ListObjectsV2Response listObjectsV2Response = _s3Client.listObjectsV2(listObjectsV2Request); + LOGGER.debug("Getting ListObjectsV2Response: {}", listObjectsV2Response); + List<S3Object> filesReturned = listObjectsV2Response.contents(); + filesReturned.stream().forEach(object -> { + //Only add files and not directories + if (!object.key().equals(fileUri.getPath()) && !object.key().endsWith(DELIMITER)) { + String fileKey = object.key(); + if (fileKey.startsWith(DELIMITER)) { + fileKey = fileKey.substring(1); + } + builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey); + } + }); + isDone = !listObjectsV2Response.isTruncated(); + continuationToken = listObjectsV2Response.nextContinuationToken(); + } + String[] listedFiles = builder.build().toArray(new String[0]); + LOGGER.info("Listed {} files from URI: {}, is recursive: {}", listedFiles.length, fileUri, recursive); + return listedFiles; } catch (Throwable t) { throw new IOException(t); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org