This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6ce5f87  HADOOP-16920 ABFS: Make list page size configurable.
6ce5f87 is described below

commit 6ce5f8734f1864a2d628b23479cf3f6621b2fcb4
Author: bilaharith <52483117+bilahar...@users.noreply.github.com>
AuthorDate: Wed Mar 18 19:44:18 2020 +0530

    HADOOP-16920 ABFS: Make list page size configurable.
    
    
    Contributed by Bilahari T H.
    
    The page limit is set in "fs.azure.list.max.results"; default value is 500.
    
    There's currently a limit of 5000 in the store -there are no range checks
    in the client code so that limit can be changed on the server without
    any need to update the abfs connector.
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      | 14 ++++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  4 +-
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  1 +
 .../constants/FileSystemConfigurations.java        |  1 +
 .../apache/hadoop/fs/azurebfs/ITestAbfsClient.java | 90 ++++++++++++++++++++++
 5 files changed, 108 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 779f524..61fe3d8 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -125,6 +125,11 @@ public class AbfsConfiguration{
       DefaultValue = MAX_CONCURRENT_WRITE_THREADS)
   private int maxConcurrentWriteThreads;
 
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
AZURE_LIST_MAX_RESULTS,
+      MinValue = 1,
+      DefaultValue = DEFAULT_AZURE_LIST_MAX_RESULTS)
+  private int listMaxResults;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
AZURE_CONCURRENT_CONNECTION_VALUE_IN,
       MinValue = 1,
       DefaultValue = MAX_CONCURRENT_READ_THREADS)
@@ -432,6 +437,10 @@ public class AbfsConfiguration{
     return this.maxConcurrentReadThreads;
   }
 
+  public int getListMaxResults() {
+    return this.listMaxResults;
+  }
+
   public boolean getTolerateOobAppends() {
     return this.tolerateOobAppends;
   }
@@ -702,6 +711,11 @@ public class AbfsConfiguration{
     this.disableOutputStreamFlush = disableOutputStreamFlush;
   }
 
+  @VisibleForTesting
+  void setListMaxResults(int listMaxResults) {
+    this.listMaxResults = listMaxResults;
+  }
+
   private String getTrimmedPasswordString(String key, String defaultValue) 
throws IOException {
     String value = getPasswordString(key);
     if (StringUtils.isBlank(value)) {
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index bbf3608..bff0e45 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -126,7 +126,6 @@ public class AzureBlobFileSystemStore implements Closeable {
   private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss z";
   private static final String TOKEN_DATE_PATTERN = 
"yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'";
   private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
-  private static final int LIST_MAX_RESULTS = 500;
   private static final int GET_SET_AGGREGATE_COUNT = 2;
 
   private final AbfsConfiguration abfsConfiguration;
@@ -682,7 +681,8 @@ public class AzureBlobFileSystemStore implements Closeable {
     ArrayList<FileStatus> fileStatuses = new ArrayList<>();
     do {
       try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
-        AbfsRestOperation op = client.listPath(relativePath, false, 
LIST_MAX_RESULTS, continuation);
+        AbfsRestOperation op = client.listPath(relativePath, false,
+            abfsConfiguration.getListMaxResults(), continuation);
         perfInfo.registerResult(op.getResult());
         continuation = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
         ListResultSchema retrievedSchema = 
op.getResult().getListResultSchema();
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 3b0111e..a63e953 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -45,6 +45,7 @@ public final class ConfigurationKeys {
   public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = 
"fs.azure.concurrentRequestCount.out";
   public static final String AZURE_CONCURRENT_CONNECTION_VALUE_IN = 
"fs.azure.concurrentRequestCount.in";
   public static final String AZURE_TOLERATE_CONCURRENT_APPEND = 
"fs.azure.io.read.tolerate.concurrent.append";
+  public static final String AZURE_LIST_MAX_RESULTS = 
"fs.azure.list.max.results";
   public static final String 
AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = 
"fs.azure.createRemoteFileSystemDuringInitialization";
   public static final String 
AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = 
"fs.azure.skipUserGroupMetadataDuringInitialization";
   public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = 
"fs.azure.enable.autothrottling";
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index c29ee90..c6b308e 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -46,6 +46,7 @@ public final class FileSystemConfigurations {
   public static final int MAX_BUFFER_SIZE = 100 * ONE_MB;  // 100 MB
   public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // 
changing default abfs blocksize to 256MB
   public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
+  public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 500;
 
   public static final int MAX_CONCURRENT_READ_THREADS = 12;
   public static final int MAX_CONCURRENT_WRITE_THREADS = 8;
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java
index 3d6869d..a4d6458 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java
@@ -18,13 +18,23 @@
 
 package org.apache.hadoop.fs.azurebfs;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
@@ -38,6 +48,7 @@ import static 
org.apache.hadoop.test.LambdaTestUtils.intercept;
  */
 public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
   private static final int LIST_MAX_RESULTS = 500;
+  private static final int LIST_MAX_RESULTS_SERVER = 5000;
 
   public ITestAbfsClient() throws Exception {
     super();
@@ -75,4 +86,83 @@ public final class ITestAbfsClient extends 
AbstractAbfsIntegrationTest {
             "UnknownHostException: " + fakeAccountName,
             () -> FileSystem.get(conf.getRawConfiguration()));
   }
+
+  @Test
+  public void testListPathWithValidListMaxResultsValues()
+      throws IOException, ExecutionException, InterruptedException {
+    final int fileCount = 10;
+    final String directory = "testWithValidListMaxResultsValues";
+    createDirectoryWithNFiles(directory, fileCount);
+    final int[] testData = {fileCount + 100, fileCount + 1, fileCount,
+        fileCount - 1, 1};
+    for (int i = 0; i < testData.length; i++) {
+      int listMaxResults = testData[i];
+      setListMaxResults(listMaxResults);
+      int expectedListResultsSize =
+          listMaxResults > fileCount ? fileCount : listMaxResults;
+      Assertions.assertThat(listPath(directory)).describedAs(
+          "AbfsClient.listPath result should contain %d items when "
+              + "listMaxResults is %d and directory contains %d items",
+          expectedListResultsSize, listMaxResults, fileCount)
+          .hasSize(expectedListResultsSize);
+    }
+  }
+
+  @Test
+  public void testListPathWithValueGreaterThanServerMaximum()
+      throws IOException, ExecutionException, InterruptedException {
+    setListMaxResults(LIST_MAX_RESULTS_SERVER + 100);
+    final String directory = "testWithValueGreaterThanServerMaximum";
+    createDirectoryWithNFiles(directory, LIST_MAX_RESULTS_SERVER + 200);
+    Assertions.assertThat(listPath(directory)).describedAs(
+        "AbfsClient.listPath result will contain a maximum of %d items "
+            + "even if listMaxResults >= %d or directory "
+            + "contains more than %d items", LIST_MAX_RESULTS_SERVER,
+        LIST_MAX_RESULTS_SERVER, LIST_MAX_RESULTS_SERVER)
+        .hasSize(LIST_MAX_RESULTS_SERVER);
+  }
+
+  @Test
+  public void testListPathWithInvalidListMaxResultsValues() throws Exception {
+    for (int i = -1; i < 1; i++) {
+      setListMaxResults(i);
+      intercept(AbfsRestOperationException.class, "Operation failed: \"One of "
+          + "the query parameters specified in the request URI is outside" + " 
"
+          + "the permissible range.", () -> listPath("directory"));
+    }
+  }
+
+  private List<ListResultEntrySchema> listPath(String directory)
+      throws IOException {
+    return getFileSystem().getAbfsClient()
+        .listPath(directory, false, getListMaxResults(), null).getResult()
+        .getListResultSchema().paths();
+  }
+
+  private int getListMaxResults() throws IOException {
+    return getFileSystem().getAbfsStore().getAbfsConfiguration()
+        .getListMaxResults();
+  }
+
+  private void setListMaxResults(int listMaxResults) throws IOException {
+    getFileSystem().getAbfsStore().getAbfsConfiguration()
+        .setListMaxResults(listMaxResults);
+  }
+
+  private void createDirectoryWithNFiles(String directory, int n)
+      throws ExecutionException, InterruptedException {
+    final List<Future<Void>> tasks = new ArrayList<>();
+    ExecutorService es = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < n; i++) {
+      final Path fileName = new Path("/" + directory + "/test" + i);
+      tasks.add(es.submit(() -> {
+        touch(fileName);
+        return null;
+      }));
+    }
+    for (Future<Void> task : tasks) {
+      task.get();
+    }
+    es.shutdownNow();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to