This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a5556d3  HDDS-3286. BasicOzoneFileSystem  support batchDelete. (#814)
a5556d3 is described below

commit a5556d34365e3bd377f95805e358efdfe9717a8c
Author: micah zhao <[email protected]>
AuthorDate: Wed Jun 24 05:21:50 2020 +0800

    HDDS-3286. BasicOzoneFileSystem  support batchDelete. (#814)
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   4 +
 .../common/src/main/resources/ozone-default.xml    |   8 +
 .../apache/hadoop/ozone/client/OzoneBucket.java    |  15 ++
 .../ozone/client/protocol/ClientProtocol.java      |  11 ++
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  18 ++
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   1 +
 .../ozone/om/protocol/OzoneManagerProtocol.java    |  10 +
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  29 +++
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |  42 ++++
 .../ozone/om/TestOzoneManagerHAWithData.java       |  41 ++++
 .../src/main/proto/OmClientProtocol.proto          |  12 ++
 .../interface-client/src/main/proto/proto.lock     |  42 ++++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  15 ++
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   3 +
 .../hadoop/ozone/om/request/OMClientRequest.java   |  34 ++++
 .../ozone/om/request/key/OMKeysDeleteRequest.java  | 214 +++++++++++++++++++++
 .../om/response/key/OMKeysDeleteResponse.java      | 130 +++++++++++++
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |  19 ++
 .../hadoop/fs/ozone/BasicOzoneFileSystem.java      |  56 ++++--
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   |  13 ++
 .../apache/hadoop/fs/ozone/OzoneClientAdapter.java |   2 +
 21 files changed, 701 insertions(+), 18 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 17cea82..7d46b01 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -393,6 +393,10 @@ public final class OzoneConfigKeys {
       "ozone.s3.token.max.lifetime";
   public static final String OZONE_S3_AUTHINFO_MAX_LIFETIME_KEY_DEFAULT = "3m";
 
+  public static final String OZONE_FS_ITERATE_BATCH_SIZE =
+      "ozone.fs.iterate.batch-size";
+  public static final int OZONE_FS_ITERATE_BATCH_SIZE_DEFAULT = 100;
+
   // Ozone Client Retry and Failover configurations
   public static final String OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY =
       "ozone.client.failover.max.attempts";
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index b792cc9..49a9b9d 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1937,6 +1937,14 @@
   </property>
   
   <property>
+    <name>ozone.fs.iterate.batch-size</name>
+    <value>100</value>
+    <tag>OZONE, OZONEFS</tag>
+    <description>
+      Iterate batch size of delete when use BasicOzoneFileSystem.
+    </description>
+  </property>
+  <property>
     <name>ozone.manager.db.checkpoint.transfer.bandwidthPerSec</name>
     <value>0</value>
     <tag>OZONE</tag>
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index 87710ea..f4bfbcd 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -382,6 +382,21 @@ public class OzoneBucket extends WithMetadata {
     proxy.deleteKey(volumeName, name, key);
   }
 
+  /**
+   * Deletes the given list of keys from the bucket.
+   * @param keyList List of the key name to be deleted.
+   * @throws IOException
+   */
+  public void deleteKeys(List<String> keyList) throws IOException {
+    proxy.deleteKeys(volumeName, name, keyList);
+  }
+
+  /**
+   * Rename the keyname from fromKeyName to toKeyName.
+   * @param fromKeyName The original key name.
+   * @param toKeyName New key name.
+   * @throws IOException
+   */
   public void renameKey(String fromKeyName, String toKeyName)
       throws IOException {
     proxy.renameKey(volumeName, name, fromKeyName, toKeyName);
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index dbed053..9c662ef 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -295,6 +295,17 @@ public interface ClientProtocol {
       throws IOException;
 
   /**
+   * Deletes keys through the list.
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param keyNameList List of the Key
+   * @throws IOException
+   */
+  void deleteKeys(String volumeName, String bucketName,
+                  List<String> keyNameList)
+      throws IOException;
+
+  /**
    * Renames an existing key within a bucket.
    * @param volumeName Name of the Volume
    * @param bucketName Name of the Bucket
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 8437699..dcdce10 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -714,6 +714,24 @@ public class RpcClient implements ClientProtocol {
   }
 
   @Override
+  public void deleteKeys(
+          String volumeName, String bucketName, List<String> keyNameList)
+          throws IOException {
+    HddsClientUtils.verifyResourceName(volumeName, bucketName);
+    Preconditions.checkNotNull(keyNameList);
+    List<OmKeyArgs> keyArgsList = new ArrayList<>();
+    for (String keyName: keyNameList) {
+      OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+          .setVolumeName(volumeName)
+          .setBucketName(bucketName)
+          .setKeyName(keyName)
+          .build();
+      keyArgsList.add(keyArgs);
+    }
+    ozoneManagerClient.deleteKeys(keyArgsList);
+  }
+
+  @Override
   public void renameKey(String volumeName, String bucketName,
       String fromKeyName, String toKeyName) throws IOException {
     verifyVolumeName(volumeName);
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 12220cd..94bd33e 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -247,6 +247,7 @@ public final class OmUtils {
     case CreateKey:
     case RenameKey:
     case DeleteKey:
+    case DeleteKeys:
     case CommitKey:
     case AllocateBlock:
     case InitiateMultiPartUpload:
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index b377cf2..b342ef2 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -225,6 +225,16 @@ public interface OzoneManagerProtocol
   void deleteKey(OmKeyArgs args) throws IOException;
 
   /**
+   * Deletes existing key/keys. This interface supports delete
+   * multiple keys and a single key. Used by deleting files
+   * through OzoneFileSystem.
+   *
+   * @param args the list args of the key.
+   * @throws IOException
+   */
+  void deleteKeys(List<OmKeyArgs> args) throws IOException;
+
+  /**
    * Deletes an existing empty bucket from volume.
    * @param volume - Name of the volume.
    * @param bucket - Name of the bucket.
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 91eafe9..3c676eb 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -71,6 +71,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateV
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteBucketRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeysRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteVolumeRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetAclRequest;
@@ -713,6 +714,34 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
   }
 
   /**
+   * Deletes existing key/keys. This interface supports delete
+   * multiple keys and a single key.
+   *
+   * @param args the list args of the key.
+   * @throws IOException
+   */
+  @Override
+  public void deleteKeys(List<OmKeyArgs> args) throws IOException {
+    DeleteKeysRequest.Builder req = DeleteKeysRequest.newBuilder();
+    List <KeyArgs> keyArgsList = new ArrayList<KeyArgs>();
+    for (OmKeyArgs omKeyArgs : args) {
+      KeyArgs keyArgs = KeyArgs.newBuilder()
+          .setVolumeName(omKeyArgs.getVolumeName())
+          .setBucketName(omKeyArgs.getBucketName())
+          .setKeyName(omKeyArgs.getKeyName()).build();
+      keyArgsList.add(keyArgs);
+    }
+    req.addAllKeyArgs(keyArgsList);
+
+    OMRequest omRequest = createOMRequest(Type.DeleteKeys)
+        .setDeleteKeysRequest(req)
+        .build();
+
+    handleError(submitRequest(omRequest));
+
+  }
+
+  /**
    * Deletes an existing empty bucket from volume.
    * @param volume - Name of the volume.
    * @param bucket - Name of the bucket.
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
index c784897..ba3d643 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.test.GenericTestUtils;
 
 import org.apache.commons.io.IOUtils;
 
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -171,6 +172,7 @@ public class TestOzoneFileSystem {
 
     testCreateDoesNotAddParentDirKeys();
     testDeleteCreatesFakeParentDir();
+    testFileDelete();
     testNonExplicitlyCreatedPathExistsAfterItsLeafsWereRemoved();
 
     testRenameDir();
@@ -204,6 +206,8 @@ public class TestOzoneFileSystem {
 
     // Set the fs.defaultFS and start the filesystem
     conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+    // Set the number of keys to be processed during batch operate.
+    conf.setInt(OZONE_FS_ITERATE_BATCH_SIZE, 5);
     fs = FileSystem.get(conf);
   }
 
@@ -262,6 +266,44 @@ public class TestOzoneFileSystem {
     assertEquals(parentKey, parentKeyInfo.getName());
   }
 
+  private void testFileDelete() throws Exception {
+    Path grandparent = new Path("/testBatchDelete");
+    Path parent = new Path(grandparent, "parent");
+    Path childFolder = new Path(parent, "childFolder");
+    // BatchSize is 5, so we're going to set a number that's not a
+    // multiple of 5. In order to test the final number of keys less than
+    // batchSize can also be deleted.
+    for (int i = 0; i < 8; i++) {
+      Path childFile = new Path(parent, "child" + i);
+      Path childFolderFile = new Path(childFolder, "child" + i);
+      ContractTestUtils.touch(fs, childFile);
+      ContractTestUtils.touch(fs, childFolderFile);
+    }
+
+    assertTrue(fs.listStatus(grandparent).length == 1);
+    assertTrue(fs.listStatus(parent).length == 9);
+    assertTrue(fs.listStatus(childFolder).length == 8);
+
+    Boolean successResult = fs.delete(grandparent, true);
+    assertTrue(successResult);
+    assertTrue(!o3fs.exists(grandparent));
+    for (int i = 0; i < 8; i++) {
+      Path childFile = new Path(parent, "child" + i);
+      // Make sure all keys under testBatchDelete/parent should be deleted
+      assertTrue(!o3fs.exists(childFile));
+
+      // Test to recursively delete child folder, make sure all keys under
+      // testBatchDelete/parent/childFolder should be deleted.
+      Path childFolderFile = new Path(childFolder, "child" + i);
+      assertTrue(!o3fs.exists(childFolderFile));
+    }
+    // Will get: WARN  ozone.BasicOzoneFileSystem delete: Path does not exist.
+    // This will return false.
+    Boolean falseResult = fs.delete(parent, true);
+    assertFalse(falseResult);
+
+  }
+
   private void testListStatus() throws Exception {
     Path parent = new Path("/testListStatus");
     Path file1 = new Path(parent, "key1");
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
index 107ebfc..646b915 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
@@ -50,6 +50,7 @@ import static 
org.apache.hadoop.ozone.MiniOzoneHAClusterImpl.NODE_FAILURE_TIMEOU
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_ALREADY_EXISTS;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static org.junit.Assert.fail;
 
 /**
@@ -151,6 +152,46 @@ public class TestOzoneManagerHAWithData extends 
TestOzoneManagerHA {
 
   }
 
+  @Test
+  public void testKeysDelete() throws Exception {
+    OzoneBucket ozoneBucket = setupBucket();
+    String data = "random data";
+    String keyName1 = "dir/file1";
+    String keyName2 = "dir/file2";
+    String keyName3 = "dir/file3";
+    String keyName4 = "dir/file4";
+    List<String> keyList1 = new ArrayList<>();
+    keyList1.add(keyName2);
+    keyList1.add(keyName3);
+
+    testCreateFile(ozoneBucket, keyName1, data, true, false);
+    testCreateFile(ozoneBucket, keyName2, data, true, false);
+    testCreateFile(ozoneBucket, keyName3, data, true, false);
+    testCreateFile(ozoneBucket, keyName4, data, true, false);
+    ozoneBucket.getKey("dir/file1").getName();
+
+    // Delete keyName1 use deleteKey api.
+    ozoneBucket.deleteKey(keyName1);
+
+    // Delete keyName2 and keyName3 in keyList1 using the deleteKeys api.
+    ozoneBucket.deleteKeys(keyList1);
+
+    // In keyList2 keyName3 was previously deleted and KeyName4 exists .
+    List<String> keyList2 = new ArrayList<>();
+    keyList2.add(keyName3);
+    keyList2.add(keyName4);
+
+    // Because keyName3 has been deleted, there should be a KEY_NOT_FOUND
+    // exception. In this case, we test for deletion failure.
+    try {
+      ozoneBucket.deleteKeys(keyList2);
+      fail("testFilesDelete");
+    } catch (OMException ex) {
+      // The expected exception KEY_NOT_FOUND.
+      Assert.assertEquals(KEY_NOT_FOUND, ex.getResult());
+    }
+  }
+
 
   @Test
   public void testFileOperationsWithNonRecursive() throws Exception {
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 569be74..5c47cfa 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -59,6 +59,7 @@ enum Type {
   ListKeys = 35;
   CommitKey = 36;
   AllocateBlock = 37;
+  DeleteKeys = 38;
 
   InitiateMultiPartUpload = 45;
   CommitMultiPartUpload = 46;
@@ -124,6 +125,7 @@ message OMRequest {
   optional ListKeysRequest                  listKeysRequest                = 
35;
   optional CommitKeyRequest                 commitKeyRequest               = 
36;
   optional AllocateBlockRequest             allocateBlockRequest           = 
37;
+  optional DeleteKeysRequest                deleteKeysRequest              = 
38;
 
   optional MultipartInfoInitiateRequest     initiateMultiPartUploadRequest = 
45;
   optional MultipartCommitUploadPartRequest commitMultiPartUploadRequest   = 
46;
@@ -195,6 +197,7 @@ message OMResponse {
   optional ListKeysResponse                  listKeysResponse              = 
35;
   optional CommitKeyResponse                 commitKeyResponse             = 
36;
   optional AllocateBlockResponse             allocateBlockResponse         = 
37;
+  optional DeleteKeysResponse                deleteKeysResponse            = 
38;
 
   optional MultipartInfoInitiateResponse   initiateMultiPartUploadResponse = 
45;
   optional MultipartCommitUploadPartResponse commitMultiPartUploadResponse = 
46;
@@ -841,6 +844,10 @@ message DeleteKeyRequest {
     required KeyArgs keyArgs = 1;
 }
 
+message DeleteKeysRequest {
+    repeated KeyArgs keyArgs = 1;
+}
+
 message DeleteKeyResponse {
 
     optional KeyInfo keyInfo = 2;
@@ -856,6 +863,11 @@ message DeletedKeys {
     repeated string keys = 3;
 }
 
+message DeleteKeysResponse {
+    repeated KeyInfo deletedKeys = 1;
+    repeated KeyInfo unDeletedKeys = 2;
+}
+
 message PurgeKeysRequest {
     repeated DeletedKeys deletedKeys = 1;
 }
diff --git a/hadoop-ozone/interface-client/src/main/proto/proto.lock 
b/hadoop-ozone/interface-client/src/main/proto/proto.lock
index fad9e37..7e41dfa 100644
--- a/hadoop-ozone/interface-client/src/main/proto/proto.lock
+++ b/hadoop-ozone/interface-client/src/main/proto/proto.lock
@@ -80,6 +80,10 @@
                 "integer": 37
               },
               {
+                "name": "DeleteKeys",
+                "integer": 38
+              },
+              {
                 "name": "InitiateMultiPartUpload",
                 "integer": 45
               },
@@ -703,6 +707,11 @@
                 "type": "AllocateBlockRequest"
               },
               {
+                "id": 38,
+                "name": "deleteKeysRequest",
+                "type": "DeleteKeysRequest"
+              },
+              {
                 "id": 45,
                 "name": "initiateMultiPartUploadRequest",
                 "type": "MultipartInfoInitiateRequest"
@@ -969,6 +978,11 @@
                 "type": "AllocateBlockResponse"
               },
               {
+                "id": 38,
+                "name": "deleteKeysResponse",
+                "type": "DeleteKeysResponse"
+              },
+              {
                 "id": 45,
                 "name": "initiateMultiPartUploadResponse",
                 "type": "MultipartInfoInitiateResponse"
@@ -2401,6 +2415,17 @@
             ]
           },
           {
+            "name": "DeleteKeysRequest",
+            "fields": [
+              {
+                "id": 1,
+                "name": "keyArgs",
+                "type": "KeyArgs",
+                "is_repeated": true
+              }
+            ]
+          },
+          {
             "name": "DeleteKeyResponse",
             "fields": [
               {
@@ -2442,6 +2467,23 @@
             ]
           },
           {
+            "name": "DeleteKeysResponse",
+            "fields": [
+              {
+                "id": 1,
+                "name": "deletedKeys",
+                "type": "KeyInfo",
+                "is_repeated": true
+              },
+              {
+                "id": 2,
+                "name": "unDeletedKeys",
+                "type": "KeyInfo",
+                "is_repeated": true
+              }
+            ]
+          },
+          {
             "name": "PurgeKeysRequest",
             "fields": [
               {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 2fde315..a75aa0d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -2217,6 +2217,21 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     }
   }
 
+  /**
+   * Deletes an existing key.
+   *
+   * @param args - List attributes of the key.
+   * @throws IOException
+   */
+  @Override
+  public void deleteKeys(List<OmKeyArgs> args) throws IOException {
+    if (args != null) {
+      for (OmKeyArgs keyArgs : args) {
+        deleteKey(keyArgs);
+      }
+    }
+  }
+
   @Override
   public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
       String startKey, String keyPrefix, int maxKeys) throws IOException {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index ddbda17..4aaaf13 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketRemoveAclRequest;
 import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketSetAclRequest;
 import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequest;
 import org.apache.hadoop.ozone.om.request.file.OMFileCreateRequest;
+import org.apache.hadoop.ozone.om.request.key.OMKeysDeleteRequest;
 import org.apache.hadoop.ozone.om.request.key.OMAllocateBlockRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
@@ -124,6 +125,8 @@ public final class OzoneManagerRatisUtils {
       return new OMKeyCommitRequest(omRequest);
     case DeleteKey:
       return new OMKeyDeleteRequest(omRequest);
+    case DeleteKeys:
+      return new OMKeysDeleteRequest(omRequest);
     case RenameKey:
       return new OMKeyRenameRequest(omRequest);
     case CreateDirectory:
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index 7f59b86..0353144 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -35,12 +36,15 @@ import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.AuditMessage;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.WithObjectID;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .DeleteKeysResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
@@ -221,6 +225,36 @@ public abstract class OMClientRequest implements 
RequestAuditor {
   }
 
   /**
+   * Set parameters needed for return error response to client.
+   *
+   * @param omResponse
+   * @param ex         - IOException
+   * @param unDeletedKeys    - Set<OmKeyInfo>
+   * @return error response need to be returned to client - OMResponse.
+   */
+  protected OMResponse createOperationKeysErrorOMResponse(
+      @Nonnull OMResponse.Builder omResponse,
+      @Nonnull IOException ex, @Nonnull Set<OmKeyInfo> unDeletedKeys) {
+    omResponse.setSuccess(false);
+    StringBuffer errorMsg = new StringBuffer();
+    DeleteKeysResponse.Builder resp = DeleteKeysResponse.newBuilder();
+    for (OmKeyInfo key : unDeletedKeys) {
+      if(key != null) {
+        resp.addUnDeletedKeys(key.getProtobuf());
+      }
+    }
+    if (errorMsg != null) {
+      omResponse.setMessage(errorMsg.toString());
+    }
+    // TODO: Currently all delete operations in OzoneBucket.java are void. Here
+    //  we put the List of unDeletedKeys into Response. These KeyInfo can be
+    //  used to continue deletion if client support delete retry.
+    omResponse.setDeleteKeysResponse(resp.build());
+    omResponse.setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(ex));
+    return omResponse.build();
+  }
+
+  /**
    * Add the client response to double buffer and set the flush future.
    * For responses which has status set to REPLAY it is a no-op.
    * @param trxIndex
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
new file mode 100644
index 0000000..b5e8dc8
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMReplayException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeyDeleteResponse;
+import org.apache.hadoop.ozone.om.response.key.OMKeysDeleteResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .DeleteKeysRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .DeleteKeysResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyArgs;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+
+/**
+ * Handles DeleteKey request.
+ */
+public class OMKeysDeleteRequest extends OMKeyRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMKeysDeleteRequest.class);
+
+  public OMKeysDeleteRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    DeleteKeysRequest deleteKeyRequest =
+        getOmRequest().getDeleteKeysRequest();
+    Preconditions.checkNotNull(deleteKeyRequest);
+    List<KeyArgs> newKeyArgsList = new ArrayList<>();
+    for (KeyArgs keyArgs : deleteKeyRequest.getKeyArgsList()) {
+      newKeyArgsList.add(
+          keyArgs.toBuilder().setModificationTime(Time.now()).build());
+    }
+    DeleteKeysRequest newDeleteKeyRequest = DeleteKeysRequest
+        .newBuilder().addAllKeyArgs(newKeyArgsList).build();
+
+    return getOmRequest().toBuilder()
+        .setDeleteKeysRequest(newDeleteKeyRequest)
+        .setUserInfo(getUserInfo()).build();
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+    DeleteKeysRequest deleteKeyRequest =
+        getOmRequest().getDeleteKeysRequest();
+
+    List<KeyArgs> deleteKeyArgsList = deleteKeyRequest.getKeyArgsList();
+    Set<OmKeyInfo> unDeletedKeys = new HashSet<>();
+    IOException exception = null;
+    OMClientResponse omClientResponse = null;
+    Result result = null;
+
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumKeyDeletes();
+    Map<String, String> auditMap = null;
+    String volumeName = "";
+    String bucketName = "";
+    String keyName = "";
+    List<OmKeyInfo> omKeyInfoList = new ArrayList<>();
+
+    AuditLogger auditLogger = ozoneManager.getAuditLogger();
+    OzoneManagerProtocolProtos.UserInfo userInfo =
+        getOmRequest().getUserInfo();
+
+    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
+        getOmRequest());
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    try {
+      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
+        volumeName = deleteKeyArgs.getVolumeName();
+        bucketName = deleteKeyArgs.getBucketName();
+        keyName = deleteKeyArgs.getKeyName();
+        String objectKey = omMetadataManager.getOzoneKey(volumeName, 
bucketName,
+            keyName);
+        OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
+        omKeyInfoList.add(omKeyInfo);
+        unDeletedKeys.add(omKeyInfo);
+      }
+
+      // Check if any of the key in the batch cannot be deleted. If exists the
+      // batch will delete failed.
+      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
+        volumeName = deleteKeyArgs.getVolumeName();
+        bucketName = deleteKeyArgs.getBucketName();
+        keyName = deleteKeyArgs.getKeyName();
+        auditMap = buildKeyArgsAuditMap(deleteKeyArgs);
+        // check Acl
+        checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
+            IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+
+        String objectKey = omMetadataManager.getOzoneKey(
+            volumeName, bucketName, keyName);
+
+        // Validate bucket and volume exists or not.
+        validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+        OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
+
+        if (omKeyInfo == null) {
+          throw new OMException("Key not found: " + keyName, KEY_NOT_FOUND);
+        }
+
+        // Check if this transaction is a replay of ratis logs.
+        if (isReplay(ozoneManager, omKeyInfo, trxnLogIndex)) {
+          // Replay implies the response has already been returned to
+          // the client. So take no further action and return a dummy
+          // OMClientResponse.
+          throw new OMReplayException();
+        }
+      }
+
+      omClientResponse = new OMKeysDeleteResponse(omResponse
+          .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()).build(),
+          omKeyInfoList, trxnLogIndex, ozoneManager.isRatisEnabled());
+      result = Result.SUCCESS;
+    } catch (IOException ex) {
+      if (ex instanceof OMReplayException) {
+        result = Result.REPLAY;
+        omClientResponse = new OMKeyDeleteResponse(createReplayOMResponse(
+            omResponse));
+      } else {
+        result = Result.FAILURE;
+        exception = ex;
+
+        omClientResponse = new OMKeyDeleteResponse(
+            createOperationKeysErrorOMResponse(omResponse, exception,
+                unDeletedKeys));
+      }
+
+    } finally {
+      addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
+          omDoubleBufferHelper);
+    }
+
+    // Performing audit logging outside of the lock.
+    if (result != Result.REPLAY) {
+      auditLog(auditLogger, buildAuditMessage(
+          OMAction.DELETE_KEY, auditMap, exception, userInfo));
+    }
+
+    switch (result) {
+    case SUCCESS:
+      omMetrics.decNumKeys();
+      LOG.debug("Key deleted. Volume:{}, Bucket:{}, Key:{}", volumeName,
+          bucketName, keyName);
+      break;
+    case REPLAY:
+      LOG.debug("Replayed Transaction {} ignored. Request: {}",
+          trxnLogIndex, deleteKeyRequest);
+      break;
+    case FAILURE:
+      omMetrics.incNumKeyDeleteFails();
+      LOG.error("Key delete failed. Volume:{}, Bucket:{}, Key{}." +
+          " Exception:{}", volumeName, bucketName, keyName, exception);
+      break;
+    default:
+      LOG.error("Unrecognized Result for OMKeyDeleteRequest: {}",
+          deleteKeyRequest);
+    }
+
+    return omClientResponse;
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponse.java
new file mode 100644
index 0000000..15231ad
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeysDeleteResponse.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.key;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
+/**
+ * Response for DeleteKey request.
+ */
+public class OMKeysDeleteResponse extends OMClientResponse {
+  private List<OmKeyInfo> omKeyInfoList;
+  private boolean isRatisEnabled;
+  private long trxnLogIndex;
+
+  public OMKeysDeleteResponse(@Nonnull OMResponse omResponse,
+                              @Nonnull List<OmKeyInfo> omKeyInfoList,
+                              long trxnLogIndex, boolean isRatisEnabled) {
+    super(omResponse);
+    this.omKeyInfoList = omKeyInfoList;
+    this.isRatisEnabled = isRatisEnabled;
+    this.trxnLogIndex = trxnLogIndex;
+  }
+
+  /**
+   * For when the request is not successful or it is a replay transaction.
+   * For a successful request, the other constructor should be used.
+   */
+  public OMKeysDeleteResponse(@Nonnull OMResponse omResponse) {
+    super(omResponse);
+    checkStatusNotOK();
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+                           BatchOperation batchOperation) throws IOException {
+
+    for (OmKeyInfo omKeyInfo : omKeyInfoList) {
+      // Set the UpdateID to current transactionLogIndex
+      omKeyInfo.setUpdateID(trxnLogIndex, isRatisEnabled);
+
+      // For OmResponse with failure, this should do nothing. This method is
+      // not called in failure scenario in OM code.
+      if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) 
{
+        boolean acquiredLock = false;
+        String volumeName = "";
+        String bucketName = "";
+
+        try {
+          volumeName = omKeyInfo.getVolumeName();
+          bucketName = omKeyInfo.getBucketName();
+          String keyName = omKeyInfo.getKeyName();
+          acquiredLock =
+              omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+                  volumeName, bucketName);
+          // Update table cache.
+          omMetadataManager.getKeyTable().addCacheEntry(
+              new CacheKey<>(omMetadataManager.getOzoneKey(
+                  volumeName, bucketName, keyName)),
+              new CacheValue<>(Optional.absent(), trxnLogIndex));
+
+          String ozoneKey = omMetadataManager.getOzoneKey(
+              omKeyInfo.getVolumeName(), omKeyInfo.getBucketName(),
+              omKeyInfo.getKeyName());
+          omMetadataManager.getKeyTable().deleteWithBatch(batchOperation,
+              ozoneKey);
+          // If a deleted key is put in the table where a key with the same
+          // name already exists, then the old deleted key information would
+          // be lost. To avoid this, first check if a key with same name
+          // exists. deletedTable in OM Metadata stores <KeyName,
+          // RepeatedOMKeyInfo>. The RepeatedOmKeyInfo is the structure that
+          // allows us to store a list of OmKeyInfo that can be tied to same
+          // key name. For a keyName if RepeatedOMKeyInfo structure is null,
+          // we create a new instance, if it is not null, then we simply add
+          // to the list and store this instance in deletedTable.
+          RepeatedOmKeyInfo repeatedOmKeyInfo =
+              omMetadataManager.getDeletedTable().get(ozoneKey);
+          repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
+              omKeyInfo, repeatedOmKeyInfo, omKeyInfo.getUpdateID(),
+              isRatisEnabled);
+          omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
+              ozoneKey, repeatedOmKeyInfo);
+          if (acquiredLock) {
+            omMetadataManager.getLock().releaseWriteLock(
+                BUCKET_LOCK, volumeName, bucketName);
+            acquiredLock = false;
+          }
+        } finally {
+          if (acquiredLock) {
+            omMetadataManager.getLock()
+                .releaseWriteLock(BUCKET_LOCK, volumeName,
+                    bucketName);
+          }
+        }
+      }
+    }
+  }
+
+}
\ No newline at end of file
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index defe3c1..a2f4c17 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -273,6 +273,25 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
     }
   }
 
+  /**
+   * Helper method to delete an object specified by key name in bucket.
+   *
+   * @param keyNameList key name list to be deleted
+   * @return true if the key is deleted, false otherwise
+   */
+  @Override
+  public boolean deleteObjects(List<String> keyNameList) {
+    LOG.trace("issuing delete for key {}", keyNameList);
+    try {
+      incrementCounter(Statistic.OBJECTS_DELETED);
+      bucket.deleteKeys(keyNameList);
+      return true;
+    } catch (IOException ioe) {
+      LOG.error("delete key failed {}", ioe.getMessage());
+      return false;
+    }
+  }
+
   public FileStatusAdapter getFileStatus(String key, URI uri,
       Path qualifiedPath, String userName)
       throws IOException {
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
index 632ef4c..f0df9b2 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.ozone;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CreateFlag;
@@ -49,6 +50,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -61,6 +63,8 @@ import java.util.stream.Collectors;
 import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
 
@@ -269,9 +273,12 @@ public class BasicOzoneFileSystem extends FileSystem {
     }
 
     @Override
-    boolean processKey(String key) throws IOException {
-      String newKeyName = dstKey.concat(key.substring(srcKey.length()));
-      adapter.renameKey(key, newKeyName);
+    boolean processKey(List<String> keyList) throws IOException {
+      // TODO RenameKey needs to be changed to batch operation
+      for(String key : keyList) {
+        String newKeyName = dstKey.concat(key.substring(srcKey.length()));
+        adapter.renameKey(key, newKeyName);
+      }
       return true;
     }
   }
@@ -404,17 +411,12 @@ public class BasicOzoneFileSystem extends FileSystem {
     }
 
     @Override
-    boolean processKey(String key) throws IOException {
-      if (key.equals("")) {
-        LOG.trace("Skipping deleting root directory");
-        return true;
-      } else {
-        LOG.trace("deleting key:{}", key);
-        boolean succeed = adapter.deleteObject(key);
-        // if recursive delete is requested ignore the return value of
-        // deleteObject and issue deletes for other keys.
-        return recursive || succeed;
-      }
+    boolean processKey(List<String> key) throws IOException {
+      LOG.trace("deleting key:{}", key);
+      boolean succeed = adapter.deleteObjects(key);
+      // if recursive delete is requested ignore the return value of
+      // deleteObject and issue deletes for other keys.
+      return recursive || succeed;
     }
   }
 
@@ -474,7 +476,9 @@ public class BasicOzoneFileSystem extends FileSystem {
       result = innerDelete(f, recursive);
     } else {
       LOG.debug("delete: Path is a file: {}", f);
-      result = adapter.deleteObject(key);
+      List<String> keyList = new ArrayList<>();
+      keyList.add(key);
+      result = adapter.deleteObjects(keyList);
     }
 
     if (result) {
@@ -729,7 +733,7 @@ public class BasicOzoneFileSystem extends FileSystem {
      * @return true if we should continue iteration of keys, false otherwise.
      * @throws IOException
      */
-    abstract boolean processKey(String key) throws IOException;
+    abstract boolean processKey(List<String> key) throws IOException;
 
     /**
      * Iterates thorugh all the keys prefixed with the input path's key and
@@ -743,19 +747,35 @@ public class BasicOzoneFileSystem extends FileSystem {
      */
     boolean iterate() throws IOException {
       LOG.trace("Iterating path {}", path);
+      List<String> keyList = new ArrayList<>();
+      int batchSize = getConf().getInt(OZONE_FS_ITERATE_BATCH_SIZE,
+          OZONE_FS_ITERATE_BATCH_SIZE_DEFAULT);
       if (status.isDirectory()) {
         LOG.trace("Iterating directory:{}", pathKey);
         while (keyIterator.hasNext()) {
           BasicKeyInfo key = keyIterator.next();
           LOG.trace("iterating key:{}", key.getName());
-          if (!processKey(key.getName())) {
+          if (!key.getName().equals("")) {
+            keyList.add(key.getName());
+          }
+          if (keyList.size() >= batchSize) {
+            if (!processKey(keyList)) {
+              return false;
+            } else {
+              keyList.clear();
+            }
+          }
+        }
+        if (keyList.size() > 0) {
+          if (!processKey(keyList)) {
             return false;
           }
         }
         return true;
       } else {
         LOG.trace("iterating file:{}", path);
-        return processKey(pathKey);
+        keyList.add(pathKey);
+        return processKey(keyList);
       }
     }
 
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 9eb8689..b4bb16f 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -454,6 +454,19 @@ public class BasicRootedOzoneClientAdapterImpl
   }
 
   /**
+   * Helper method to delete an object specified by key name in bucket.
+   *
+   * @param pathList key name list to be deleted
+   * @return true if the key is deleted, false otherwise
+   */
+  @Override
+  public boolean deleteObjects(List<String> pathList) {
+    // TODO: we will support deleteObjects in ofs.
+    LOG.error("ofs currently does not support deleteObjects");
+    return false;
+  }
+
+  /**
    * Package-private helper function to reduce calls to getBucket().
    * @param bucket Bucket to operate in.
    * @param path Path to delete.
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
index 97dc5b3..2b76c22 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
@@ -53,6 +53,8 @@ public interface OzoneClientAdapter {
 
   boolean deleteObject(String keyName);
 
+  boolean deleteObjects(List<String> keyName);
+
   Iterator<BasicKeyInfo> listKeys(String pathKey);
 
   List<FileStatusAdapter> listStatus(String keyName, boolean recursive,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to