This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 6112603  HDDS-4088. Adding Owner info for Authorizer plugin to honor 
owner access rights (#1395)
6112603 is described below

commit 6112603aca864bf18243fcfceb1a330ae7ac0587
Author: Xiaoyu Yao <[email protected]>
AuthorDate: Mon Oct 26 10:56:19 2020 -0700

    HDDS-4088. Adding Owner info for Authorizer plugin to honor owner access 
rights (#1395)
---
 .../hadoop/ozone/security/acl/OzoneObjInfo.java    |  10 +
 .../hadoop/ozone/security/acl/RequestContext.java  |  48 +++-
 .../org/apache/hadoop/ozone/om/TestOmAcls.java     |   9 +-
 .../ozone/om/TestOzoneManagerListVolumes.java      |  36 +--
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  72 +++--
 .../hadoop/ozone/om/request/OMClientRequest.java   |  26 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |  34 +++
 .../ozone/om/request/key/OMKeysDeleteRequest.java  |   4 +-
 .../ozone/om/request/key/OMKeysRenameRequest.java  |   9 +-
 .../request/key/acl/prefix/OMPrefixAclRequest.java |   7 +
 .../hadoop/ozone/om/request/util/ObjectParser.java |   6 +-
 .../ozone/security/acl/OzoneNativeAuthorizer.java  |  44 ++-
 .../hadoop/ozone/security/acl/TestVolumeOwner.java | 298 +++++++++++++++++++++
 13 files changed, 552 insertions(+), 51 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
index cbae18c..42ddbb9 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
@@ -159,6 +159,16 @@ public final class OzoneObjInfo extends OzoneObj {
       return new Builder();
     }
 
+    public static Builder getBuilder(ResourceType resType,
+        StoreType storeType, String vol, String bucket, String key) {
+      return OzoneObjInfo.Builder.newBuilder()
+          .setResType(resType)
+          .setStoreType(storeType)
+          .setVolumeName(vol)
+          .setBucketName(bucket)
+          .setKeyName(key);
+    }
+
     public static Builder fromKeyArgs(OmKeyArgs args) {
       return new Builder()
           .setVolumeName(args.getVolumeName())
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/RequestContext.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/RequestContext.java
index 3295827..043cd55 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/RequestContext.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/RequestContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.security.acl;
 
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
@@ -32,16 +33,20 @@ public class RequestContext {
   private final String serviceId;
   private final ACLIdentityType aclType;
   private final ACLType aclRights;
+  private final String ownerName;
 
+  @SuppressWarnings("parameternumber")
   public RequestContext(String host, InetAddress ip,
       UserGroupInformation clientUgi, String serviceId,
-      ACLIdentityType aclType, ACLType aclRights) {
+      ACLIdentityType aclType, ACLType aclRights,
+      String ownerName) {
     this.host = host;
     this.ip = ip;
     this.clientUgi = clientUgi;
     this.serviceId = serviceId;
     this.aclType = aclType;
     this.aclRights = aclRights;
+    this.ownerName = ownerName;
   }
 
   /**
@@ -55,6 +60,12 @@ public class RequestContext {
     private IAccessAuthorizer.ACLIdentityType aclType;
     private IAccessAuthorizer.ACLType aclRights;
 
+    /**
+     *  ownerName is specially added to allow
+     *  authorizer to honor owner privilege.
+     */
+    private String ownerName;
+
     public Builder setHost(String bHost) {
       this.host = bHost;
       return this;
@@ -80,14 +91,23 @@ public class RequestContext {
       return this;
     }
 
+    public ACLType getAclRights() {
+      return this.aclRights;
+    }
+
     public Builder setAclRights(ACLType aclRight) {
       this.aclRights = aclRight;
       return this;
     }
 
+    public Builder setOwnerName(String owner) {
+      this.ownerName = owner;
+      return this;
+    }
+
     public RequestContext build() {
       return new RequestContext(host, ip, clientUgi, serviceId, aclType,
-          aclRights);
+          aclRights, ownerName);
     }
   }
 
@@ -95,6 +115,27 @@ public class RequestContext {
     return new Builder();
   }
 
+  public static RequestContext.Builder getBuilder(
+      UserGroupInformation ugi, InetAddress remoteAddress, String hostName,
+      ACLType aclType, String ownerName) {
+    RequestContext.Builder contextBuilder = RequestContext.newBuilder()
+        .setClientUgi(ugi)
+        .setIp(remoteAddress)
+        .setHost(hostName)
+        .setAclType(ACLIdentityType.USER)
+        .setAclRights(aclType)
+        .setOwnerName(ownerName);
+    return contextBuilder;
+  }
+
+  public static RequestContext.Builder getBuilder(UserGroupInformation ugi,
+      ACLType aclType, String ownerName) {
+    return getBuilder(ugi,
+        ProtobufRpcEngine.Server.getRemoteIp(),
+        ProtobufRpcEngine.Server.getRemoteIp().getHostName(),
+        aclType, ownerName);
+  }
+
   public String getHost() {
     return host;
   }
@@ -119,4 +160,7 @@ public class RequestContext {
     return aclRights;
   }
 
+  public String getOwnerName() {
+    return ownerName;
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
index 34303ff..271109f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
@@ -116,7 +116,14 @@ public class TestOmAcls {
 
     String volumeName = RandomStringUtils.randomAlphabetic(5).toLowerCase();
     String bucketName = RandomStringUtils.randomAlphabetic(5).toLowerCase();
-    cluster.getClient().getObjectStore().createVolume(volumeName);
+
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner("user" + RandomStringUtils.randomNumeric(5))
+        .setAdmin("admin" + RandomStringUtils.randomNumeric(5))
+        .build();
+
+    cluster.getClient().getObjectStore().createVolume(volumeName,
+        createVolumeArgs);
     OzoneVolume volume =
         cluster.getClient().getObjectStore().getVolume(volumeName);
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
index d7aaf37..a47aa08 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
@@ -201,16 +201,16 @@ public class TestOzoneManagerListVolumes {
 
     // Login as user1, list other users' volumes
     UserGroupInformation.setLoginUser(user1);
-    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
-        true);
+    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+        "volume5"), true);
 
     // Add "s3v" created default by OM.
     checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2", 
"volume3",
         "volume4", "volume5", "s3v"), true);
 
     UserGroupInformation.setLoginUser(user2);
-    checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
-        true);
+    checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume4",
+        "volume5"), true);
     checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2", 
"volume3",
         "volume4", "volume5", "s3v"), true);
 
@@ -229,18 +229,18 @@ public class TestOzoneManagerListVolumes {
 
     // Login as user1, list other users' volumes, expect failure
     UserGroupInformation.setLoginUser(user1);
-    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
-        false);
+    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+        "volume5"), false);
     // Add "s3v" created default by OM.
     checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2", 
"volume3",
         "volume4", "volume5", "s3v"), false);
 
     // While admin should be able to list volumes just fine.
     UserGroupInformation.setLoginUser(adminUser);
-    checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
-        true);
-    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
-        true);
+    checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume4",
+        "volume5"), true);
+    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+        "volume5"), true);
 
     stopCluster(cluster);
   }
@@ -249,10 +249,10 @@ public class TestOzoneManagerListVolumes {
   public void testAclEnabledListAllAllowed() throws Exception {
     // ozone.acl.enabled = true, ozone.om.volume.listall.allowed = true
     MiniOzoneCluster cluster = startCluster(true, true);
-    checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
-        true);
-    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
-        true);
+    checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume4",
+        "volume5"), true);
+    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+        "volume5"), true);
 
     // Add "s3v" created default by OM.
     checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2", 
"volume3",
@@ -267,11 +267,11 @@ public class TestOzoneManagerListVolumes {
     // The default user is adminUser as set in init(),
     // listall always succeeds if we use that UGI, we should use non-admin here
     UserGroupInformation.setLoginUser(user1);
-    checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
-        false);
+    checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume4",
+        "volume5"), false);
     UserGroupInformation.setLoginUser(user2);
-    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
-        false);
+    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+        "volume5"), false);
     UserGroupInformation.setLoginUser(adminUser);
     // Add "s3v" created default by OM.
     checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2",
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index e58af8b..258564c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -226,6 +226,7 @@ import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVA
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService.newReflectiveBlockingService;
 
 import org.apache.hadoop.util.Time;
@@ -528,6 +529,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
         authorizer.setKeyManager(keyManager);
         authorizer.setPrefixManager(prefixManager);
         authorizer.setOzoneAdmins(getOzoneAdmins(configuration));
+        authorizer.setAllowListAllVolumes(allowListAllVolumes);
       }
     } else {
       accessAuthorizer = null;
@@ -1627,7 +1629,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
         ProtobufRpcEngine.Server.getRemoteUser(),
         ProtobufRpcEngine.Server.getRemoteIp(),
         ProtobufRpcEngine.Server.getRemoteIp().getHostName(),
-        true);
+        true, getVolumeOwner(vol, acl));
   }
 
   /**
@@ -1642,25 +1644,46 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
           UserGroupInformation.createRemoteUser(userName),
           ProtobufRpcEngine.Server.getRemoteIp(),
           ProtobufRpcEngine.Server.getRemoteIp().getHostName(),
-          false);
+          false, getVolumeOwner(vol, acl));
     } catch (OMException ex) {
       // Should not trigger exception here at all
       return false;
     }
   }
 
-  /**
-   * CheckAcls for the ozone object.
-   *
-   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied.
-   */
-  @SuppressWarnings("parameternumber")
-  public void checkAcls(ResourceType resType, StoreType storeType,
-      ACLType aclType, String vol, String bucket, String key,
-      UserGroupInformation ugi, InetAddress remoteAddress, String hostName)
-      throws OMException {
-    checkAcls(resType, storeType, aclType, vol, bucket, key,
-        ugi, remoteAddress, hostName, true);
+  public String getVolumeOwner(String vol, ACLType type) throws OMException {
+    String volOwnerName = null;
+    if (!vol.equals(OzoneConsts.OZONE_ROOT) && (type != ACLType.CREATE)) {
+      volOwnerName = getVolumeOwner(vol);
+    }
+    return volOwnerName;
+  }
+
+  private String getVolumeOwner(String volume) throws OMException {
+    Boolean lockAcquired = metadataManager.getLock().acquireReadLock(
+        VOLUME_LOCK, volume);
+    String dbVolumeKey = metadataManager.getVolumeKey(volume);
+    OmVolumeArgs volumeArgs = null;
+    try {
+      volumeArgs = metadataManager.getVolumeTable().get(dbVolumeKey);
+    } catch (IOException ioe) {
+      if (ioe instanceof OMException) {
+        throw (OMException)ioe;
+      } else {
+        throw new OMException("getVolumeOwner for Volume " + volume + " 
failed",
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } finally {
+      if (lockAcquired) {
+        metadataManager.getLock().releaseReadLock(VOLUME_LOCK, volume);
+      }
+    }
+    if (volumeArgs != null) {
+      return volumeArgs.getOwnerName();
+    } else {
+      throw new OMException("Volume " + volume + " is not found",
+          OMException.ResultCodes.VOLUME_NOT_FOUND);
+    }
   }
 
   /**
@@ -1671,10 +1694,10 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    *                     and throwOnPermissionDenied set to true.
    */
   @SuppressWarnings("parameternumber")
-  private boolean checkAcls(ResourceType resType, StoreType storeType,
+  public boolean checkAcls(ResourceType resType, StoreType storeType,
       ACLType aclType, String vol, String bucket, String key,
       UserGroupInformation ugi, InetAddress remoteAddress, String hostName,
-      boolean throwIfPermissionDenied)
+      boolean throwIfPermissionDenied, String volumeOwner)
       throws OMException {
     OzoneObj obj = OzoneObjInfo.Builder.newBuilder()
         .setResType(resType)
@@ -1688,13 +1711,17 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
         .setHost(hostName)
         .setAclType(ACLIdentityType.USER)
         .setAclRights(aclType)
+        .setOwnerName(volumeOwner)
         .build();
     if (!accessAuthorizer.checkAccess(obj, context)) {
       if (throwIfPermissionDenied) {
         LOG.warn("User {} doesn't have {} permission to access {} /{}/{}/{}",
-            ugi.getUserName(), aclType, resType, vol, bucket, key);
-        throw new OMException("User " + ugi.getUserName() + " doesn't have " +
-            aclType + " permission to access " + resType,
+            context.getClientUgi().getUserName(), context.getAclRights(),
+            obj.getResourceType(), obj.getVolumeName(), obj.getBucketName(),
+            obj.getKeyName());
+        throw new OMException("User " + context.getClientUgi().getUserName() +
+            " doesn't have " + context.getAclRights() +
+            " permission to access " + obj.getResourceType(),
             ResultCodes.PERMISSION_DENIED);
       }
       return false;
@@ -3497,7 +3524,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     return new ResolvedBucket(requested, resolved);
   }
 
-
   public ResolvedBucket resolveBucketLink(Pair<String, String> requested)
       throws IOException {
 
@@ -3548,9 +3574,11 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     }
 
     if (isAclEnabled) {
-      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.READ,
+      final ACLType type = ACLType.READ;
+      checkAcls(ResourceType.BUCKET, StoreType.OZONE, type,
           volumeName, bucketName, null, userGroupInformation,
-          remoteAddress, hostName);
+          remoteAddress, hostName, true,
+          getVolumeOwner(volumeName, type));
     }
 
     return resolveBucketLink(
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index 728a624..674ee08 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@ -148,12 +148,36 @@ public abstract class OMClientRequest implements 
RequestAuditor {
    * @param key
    * @throws IOException
    */
+  @SuppressWarnings("parameternumber")
   public void checkAcls(OzoneManager ozoneManager,
       OzoneObj.ResourceType resType,
       OzoneObj.StoreType storeType, IAccessAuthorizer.ACLType aclType,
       String vol, String bucket, String key) throws IOException {
+    checkAcls(ozoneManager, resType, storeType, aclType, vol, bucket, key,
+        ozoneManager.getVolumeOwner(vol, aclType));
+  }
+
+  /**
+   * Check Acls of ozone object with volOwner given.
+   * @param ozoneManager
+   * @param resType
+   * @param storeType
+   * @param aclType
+   * @param vol
+   * @param bucket
+   * @param key
+   * @param volOwner
+   * @throws IOException
+   */
+  @SuppressWarnings("parameternumber")
+  public void checkAcls(OzoneManager ozoneManager,
+      OzoneObj.ResourceType resType,
+      OzoneObj.StoreType storeType, IAccessAuthorizer.ACLType aclType,
+      String vol, String bucket, String key, String volOwner)
+      throws IOException {
     ozoneManager.checkAcls(resType, storeType, aclType, vol, bucket, key,
-        createUGI(), getRemoteAddress(), getHostName());
+        createUGI(), getRemoteAddress(), getHostName(), true,
+        volOwner);
   }
 
   /**
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index a048533..ee48f9b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -198,6 +198,19 @@ public abstract class OMKeyRequest extends OMClientRequest 
{
     }
   }
 
+  // For keys batch delete and rename only
+  protected String getVolumeOwner(OMMetadataManager omMetadataManager,
+      String volumeName) throws IOException {
+    String dbVolumeKey = omMetadataManager.getVolumeKey(volumeName);
+    OmVolumeArgs volumeArgs =
+        omMetadataManager.getVolumeTable().get(dbVolumeKey);
+    if (volumeArgs == null) {
+      throw new OMException("Volume not found " + volumeName,
+          VOLUME_NOT_FOUND);
+    }
+    return volumeArgs.getOwnerName();
+  }
+
   protected static Optional<FileEncryptionInfo> getFileEncryptionInfo(
       OzoneManager ozoneManager, OmBucketInfo bucketInfo) throws IOException {
     Optional<FileEncryptionInfo> encInfo = Optional.absent();
@@ -437,6 +450,27 @@ public abstract class OMKeyRequest extends OMClientRequest 
{
   }
 
   /**
+   * Check Acls for the ozone key with volumeOwner.
+   * @param ozoneManager
+   * @param volume
+   * @param bucket
+   * @param key
+   * @param aclType
+   * @param resourceType
+   * @throws IOException
+   */
+  @SuppressWarnings("parameternumber")
+  protected void checkKeyAcls(OzoneManager ozoneManager, String volume,
+      String bucket, String key, IAccessAuthorizer.ACLType aclType,
+      OzoneObj.ResourceType resourceType, String volumeOwner)
+      throws IOException {
+    if (ozoneManager.getAclsEnabled()) {
+      checkAcls(ozoneManager, resourceType, OzoneObj.StoreType.OZONE, aclType,
+          volume, bucket, key, volumeOwner);
+    }
+  }
+
+  /**
    * Check ACLs for Ozone Key in OpenKey table
    * if ozone native authorizer is enabled.
    * @param ozoneManager
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
index c6e7b9b..71e15f5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
@@ -124,6 +124,7 @@ public class OMKeysDeleteRequest extends OMKeyRequest {
           volumeName, bucketName);
       // Validate bucket and volume exists or not.
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+      String volumeOwner = getVolumeOwner(omMetadataManager, volumeName);
 
       for (indexFailed = 0; indexFailed < length; indexFailed++) {
         String keyName = deleteKeyArgs.getKeys(indexFailed);
@@ -143,7 +144,8 @@ public class OMKeysDeleteRequest extends OMKeyRequest {
         try {
           // check Acl
           checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY,
+              volumeOwner);
           omKeyInfoList.add(omKeyInfo);
         } catch (Exception ex) {
           deleteStatus = false;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
index abaa4ae..556c6f5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
@@ -117,6 +117,9 @@ public class OMKeysRenameRequest extends OMKeyRequest {
           omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
               volumeName, bucketName);
 
+      // Validate bucket and volume exists or not.
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+      String volumeOwner = getVolumeOwner(omMetadataManager, volumeName);
       for (RenameKeysMap renameKey : renameKeysArgs.getRenameKeysMapList()) {
 
         fromKeyName = renameKey.getFromKeyName();
@@ -137,9 +140,11 @@ public class OMKeysRenameRequest extends OMKeyRequest {
           // check Acls to see if user has access to perform delete operation
           // on old key and create operation on new key
           checkKeyAcls(ozoneManager, volumeName, bucketName, fromKeyName,
-              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY,
+              volumeOwner);
           checkKeyAcls(ozoneManager, volumeName, bucketName, toKeyName,
-              IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
+              IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY,
+              volumeOwner);
         } catch (Exception ex) {
           renameStatus = false;
           unRenamedKeys.add(
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAclRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAclRequest.java
index e928402..6fbd7d2 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAclRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAclRequest.java
@@ -29,7 +29,9 @@ import 
org.apache.hadoop.ozone.om.PrefixManagerImpl.OMPrefixAclOpResult;
 import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.ObjectParser;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
@@ -71,6 +73,11 @@ public abstract class OMPrefixAclRequest extends 
OMClientRequest {
         (PrefixManagerImpl) ozoneManager.getPrefixManager();
     try {
       String prefixPath = getOzoneObj().getPath();
+      ObjectParser objectParser = new ObjectParser(prefixPath,
+          OzoneManagerProtocolProtos.OzoneObj.ObjectType.PREFIX);
+      volume = objectParser.getVolume();
+      bucket = objectParser.getBucket();
+      key = objectParser.getKey();
 
       // check Acl
       if (ozoneManager.getAclsEnabled()) {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java
index c12cdac..9b82702 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java
@@ -43,7 +43,6 @@ public class ObjectParser {
   public ObjectParser(String path, ObjectType objectType) throws OMException {
     Preconditions.checkNotNull(path);
     String[] tokens = StringUtils.split(path, OZONE_URI_DELIMITER, 3);
-
     if (objectType == ObjectType.VOLUME && tokens.length == 1) {
       volume = tokens[0];
     } else if (objectType == ObjectType.BUCKET && tokens.length == 2) {
@@ -53,6 +52,11 @@ public class ObjectParser {
       volume = tokens[0];
       bucket = tokens[1];
       key = tokens[2];
+    } else if (objectType == ObjectType.PREFIX && tokens.length >= 1) {
+      volume = tokens[0];
+      if (tokens.length >= 2) {
+        bucket = tokens[1];
+      }
     } else {
       throw new OMException("Illegal path " + path,
           OMException.ResultCodes.INVALID_PATH_IN_ACL_REQUEST);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
index df98e20..aebefdc 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
@@ -50,6 +50,7 @@ public class OzoneNativeAuthorizer implements 
IAccessAuthorizer {
   private KeyManager keyManager;
   private PrefixManager prefixManager;
   private Collection<String> ozAdmins;
+  private boolean allowListAllVolumes;
 
   public OzoneNativeAuthorizer() {
   }
@@ -87,14 +88,18 @@ public class OzoneNativeAuthorizer implements 
IAccessAuthorizer {
           "configured to work with OzoneObjInfo type only.", INVALID_REQUEST);
     }
 
-    // by pass all checks for admin
+    // bypass all checks for admin
     boolean isAdmin = isAdmin(context.getClientUgi());
     if (isAdmin) {
       return true;
     }
 
+    boolean isOwner = isOwner(context.getClientUgi(), context.getOwnerName());
     boolean isListAllVolume = ((context.getAclRights() == ACLType.LIST) &&
         objInfo.getVolumeName().equals(OzoneConsts.OZONE_ROOT));
+    if (isListAllVolume) {
+      return getAllowListAllVolumes();
+    }
 
     // For CREATE and DELETE acl requests, the parents need to be checked
     // for WRITE acl. If Key create request is received, then we need to
@@ -114,13 +119,19 @@ public class OzoneNativeAuthorizer implements 
IAccessAuthorizer {
     switch (objInfo.getResourceType()) {
     case VOLUME:
       LOG.trace("Checking access for volume: {}", objInfo);
-      if (isACLTypeCreate || isListAllVolume) {
+      if (isACLTypeCreate) {
         // only admin is allowed to create volume and list all volumes
         return false;
       }
-      return volumeManager.checkAccess(objInfo, context);
+      boolean volumeAccess =  isOwner ||
+          volumeManager.checkAccess(objInfo, context);
+      return volumeAccess;
     case BUCKET:
       LOG.trace("Checking access for bucket: {}", objInfo);
+      // Skip check for volume owner
+      if (isOwner) {
+        return true;
+      }
       // Skip bucket access check for CREATE acl since
       // bucket will not exist at the time of creation
       boolean bucketAccess = isACLTypeCreate
@@ -129,6 +140,10 @@ public class OzoneNativeAuthorizer implements 
IAccessAuthorizer {
           && volumeManager.checkAccess(objInfo, parentContext));
     case KEY:
       LOG.trace("Checking access for Key: {}", objInfo);
+      // Skip check for volume owner
+      if (isOwner) {
+        return true;
+      }
       // Skip key access check for CREATE acl since
       // key will not exist at the time of creation
       boolean keyAccess = isACLTypeCreate
@@ -139,6 +154,10 @@ public class OzoneNativeAuthorizer implements 
IAccessAuthorizer {
           && volumeManager.checkAccess(objInfo, parentContext));
     case PREFIX:
       LOG.trace("Checking access for Prefix: {}", objInfo);
+      // Skip check for volume owner
+      if (isOwner) {
+        return true;
+      }
       // Skip prefix access check for CREATE acl since
       // prefix will not exist at the time of creation
       boolean prefixAccess = isACLTypeCreate
@@ -176,6 +195,25 @@ public class OzoneNativeAuthorizer implements 
IAccessAuthorizer {
     return Collections.unmodifiableCollection(this.ozAdmins);
   }
 
+  public void setAllowListAllVolumes(boolean allowListAllVolumes) {
+    this.allowListAllVolumes = allowListAllVolumes;
+  }
+
+  public boolean getAllowListAllVolumes() {
+    return allowListAllVolumes;
+  }
+
+  private boolean isOwner(UserGroupInformation callerUgi, String ownerName) {
+    if (ownerName == null) {
+      return false;
+    }
+    if (callerUgi.getUserName().equals(ownerName) ||
+        callerUgi.getShortUserName().equals(ownerName)) {
+      return true;
+    }
+    return false;
+  }
+
   private boolean isAdmin(UserGroupInformation callerUgi) {
     if (ozAdmins == null) {
       return false;
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
new file mode 100644
index 0000000..cb7471d
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.security.acl;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.ozone.om.BucketManagerImpl;
+import org.apache.hadoop.ozone.om.KeyManagerImpl;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.PrefixManager;
+import org.apache.hadoop.ozone.om.PrefixManagerImpl;
+import org.apache.hadoop.ozone.om.VolumeManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE;
+import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
+import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.CREATE;
+import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.NONE;
+import static org.mockito.Mockito.mock;
+
+
+/**
+ * Test Ozone owner check from OzoneNativeAuthorizer.
+ */
+public class TestVolumeOwner {
+
+  private static OzoneConfiguration ozoneConfig;
+  private static OzoneNativeAuthorizer nativeAuthorizer;
+  private static KeyManagerImpl keyManager;
+  private static VolumeManagerImpl volumeManager;
+  private static BucketManagerImpl bucketManager;
+  private static PrefixManager prefixManager;
+  private static OMMetadataManager metadataManager;
+  private static UserGroupInformation testUgi;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    ozoneConfig = new OzoneConfiguration();
+    ozoneConfig.set(OZONE_ACL_AUTHORIZER_CLASS,
+        OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
+    File dir = GenericTestUtils.getRandomizedTestDir();
+    ozoneConfig.set(OZONE_METADATA_DIRS, dir.toString());
+
+    metadataManager = new OmMetadataManagerImpl(ozoneConfig);
+    volumeManager = new VolumeManagerImpl(metadataManager, ozoneConfig);
+    bucketManager = new BucketManagerImpl(metadataManager);
+    keyManager = new KeyManagerImpl(mock(ScmBlockLocationProtocol.class),
+        metadataManager, ozoneConfig, "om1", null);
+    prefixManager = new PrefixManagerImpl(metadataManager, false);
+
+    nativeAuthorizer = new OzoneNativeAuthorizer(volumeManager, bucketManager,
+        keyManager, prefixManager,
+        Collections.singletonList("om"));
+
+    testUgi = UserGroupInformation.createUserForTesting("testuser",
+        new String[]{"test"});
+
+    prepareTestVols();
+    prepareTestBuckets();
+    prepareTestKeys();
+  }
+
+  // create 2 volumes
+  private static void prepareTestVols() throws IOException {
+    for (int i = 0; i < 2; i++) {
+      OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
+          .setVolume(getTestVolumeName(i))
+          .setAdminName("om")
+          .setOwnerName(getTestVolOwnerName(i))
+          .build();
+      TestOMRequestUtils.addVolumeToOM(metadataManager, volumeArgs);
+    }
+  }
+
+  // create 2 buckets under each volume
+  private static void prepareTestBuckets() throws IOException {
+    for (int i = 0; i < 2; i++) {
+      for (int j = 0; j < 2; j++) {
+        OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
+            .setVolumeName(getTestVolumeName(i))
+            .setBucketName(getTestBucketName(j))
+            .build();
+        TestOMRequestUtils.addBucketToOM(metadataManager, bucketInfo);
+      }
+    }
+  }
+
+  // create 2 keys under each test buckets
+  private static void prepareTestKeys() throws IOException {
+    for (int i = 0; i < 2; i++) {
+      for (int j = 0; j < 2; j++) {
+        for (int k = 0; k < 2; k++) {
+          OmKeyArgs.Builder keyArgsBuilder = new OmKeyArgs.Builder()
+              .setVolumeName(getTestVolumeName(i))
+              .setBucketName(getTestBucketName(j))
+              .setKeyName(getTestKeyName(k))
+              .setFactor(HddsProtos.ReplicationFactor.ONE)
+              .setDataSize(0)
+              .setType(HddsProtos.ReplicationType.STAND_ALONE);
+          if (k == 0) {
+            keyArgsBuilder.setAcls(OzoneAclUtil.getAclList(
+                testUgi.getUserName(), testUgi.getGroupNames(), ALL, ALL));
+          } else {
+            keyArgsBuilder.setAcls(OzoneAclUtil.getAclList(
+                testUgi.getUserName(), testUgi.getGroupNames(), NONE, NONE));
+          }
+          OmKeyArgs keyArgs = keyArgsBuilder.build();
+          OpenKeySession keySession = keyManager.createFile(keyArgs, true,
+              false);
+          keyArgs.setLocationInfoList(
+              keySession.getKeyInfo().getLatestVersionLocations()
+                  .getLocationList());
+          keyManager.commitKey(keyArgs, keySession.getId());
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testVolumeOps() throws Exception {
+    OzoneObj vol0 = getTestVolumeobj(0);
+
+    // admin = true, owner = false, ownerName = testvolumeOwner
+    RequestContext nonOwnerContext = getUserRequestContext("om",
+        IAccessAuthorizer.ACLType.CREATE, false, getTestVolOwnerName(0));
+    Assert.assertTrue("matching admins are allowed to perform admin " +
+        "operations", nativeAuthorizer.checkAccess(vol0, nonOwnerContext));
+
+    // admin = true, owner = false, ownerName = null
+    Assert.assertTrue("matching admins are allowed to perform admin " +
+        "operations", nativeAuthorizer.checkAccess(vol0, nonOwnerContext));
+
+    // admin = false, owner = false, ownerName = testvolumeOwner
+    RequestContext nonAdminNonOwnerContext = getUserRequestContext("testuser",
+        IAccessAuthorizer.ACLType.CREATE, false, getTestVolOwnerName(0));
+    Assert.assertFalse("mismatching admins are not allowed to perform admin " +
+        "operations", nativeAuthorizer.checkAccess(vol0,
+        nonAdminNonOwnerContext));
+
+    // admin = false, owner = true
+    RequestContext nonAdminOwnerContext = getUserRequestContext(
+        getTestVolOwnerName(0), IAccessAuthorizer.ACLType.CREATE,
+        true, getTestVolOwnerName(0));
+    Assert.assertFalse("mismatching admins are not allowed to perform admin " +
+        "operations even for owner", nativeAuthorizer.checkAccess(vol0,
+        nonAdminOwnerContext));
+
+    List<IAccessAuthorizer.ACLType> aclsToTest =
+        Arrays.stream(IAccessAuthorizer.ACLType.values()).filter(
+            (type)-> type != NONE && type != CREATE)
+            .collect(Collectors.toList());
+    for (IAccessAuthorizer.ACLType type: aclsToTest) {
+      nonAdminOwnerContext = getUserRequestContext(getTestVolOwnerName(0),
+          type, true, getTestVolOwnerName(0));
+      Assert.assertTrue("Owner is allowed to perform all non-admin " +
+          "operations", nativeAuthorizer.checkAccess(vol0,
+          nonAdminOwnerContext));
+    }
+  }
+
+  @Test
+  public void testBucketOps() throws Exception {
+    OzoneObj obj = getTestBucketobj(1, 1);
+    List<IAccessAuthorizer.ACLType> aclsToTest = getAclsToTest();
+
+    // admin = false, owner = true
+    for (IAccessAuthorizer.ACLType type: aclsToTest) {
+      RequestContext nonAdminOwnerContext = getUserRequestContext(
+          getTestVolOwnerName(1), type, true, getTestVolOwnerName(1));
+      Assert.assertTrue("non admin volume owner without acls are allowed" +
+          " to do " + type + " on bucket",
+          nativeAuthorizer.checkAccess(obj, nonAdminOwnerContext));
+    }
+
+    // admin = false, owner = false
+    for (IAccessAuthorizer.ACLType type: aclsToTest) {
+      RequestContext nonAdminOwnerContext = getUserRequestContext(
+          getTestVolOwnerName(1), type, false, getTestVolOwnerName(0));
+      Assert.assertFalse("non admin non volume owner without acls" +
+          " are not allowed to do " + type + " on bucket",
+          nativeAuthorizer.checkAccess(obj, nonAdminOwnerContext));
+    }
+  }
+
+  @Test
+  public void testKeyOps() throws Exception {
+    OzoneObj obj = getTestKeyobj(0, 0, 1);
+    List<IAccessAuthorizer.ACLType> aclsToTest = getAclsToTest();
+
+    // admin = false, owner = true
+    for (IAccessAuthorizer.ACLType type: aclsToTest) {
+      RequestContext nonAdminOwnerContext = getUserRequestContext(
+          getTestVolOwnerName(0), type, true, getTestVolOwnerName(0));
+      Assert.assertTrue("non admin volume owner without acls are allowed to " +
+              "access key",
+          nativeAuthorizer.checkAccess(obj, nonAdminOwnerContext));
+    }
+
+    // admin = false, owner = false
+    for (IAccessAuthorizer.ACLType type: aclsToTest) {
+      RequestContext nonAdminOwnerContext = getUserRequestContext(
+          getTestVolOwnerName(0), type, false, getTestVolOwnerName(1));
+      Assert.assertFalse("non admin volume owner without acls are" +
+              " not allowed to access key",
+          nativeAuthorizer.checkAccess(obj, nonAdminOwnerContext));
+    }
+  }
+
+  private RequestContext getUserRequestContext(String username,
+      IAccessAuthorizer.ACLType type, boolean isOwner, String ownerName) {
+    return RequestContext.getBuilder(
+        UserGroupInformation.createRemoteUser(username), null, null,
+        type, ownerName).build();
+  }
+
+  private static String getTestVolumeName(int index) {
+    return "vol" + index;
+  }
+
+  private static String getTestVolOwnerName(int index) {
+    return "owner" + index;
+  }
+
+  private static String getTestBucketName(int index) {
+    return "bucket" + index;
+  }
+
+  private static String getTestKeyName(int index) {
+    return "key" + index;
+  }
+
+  private OzoneObj getTestVolumeobj(int index) {
+    return OzoneObjInfo.Builder.getBuilder(OzoneObj.ResourceType.VOLUME,
+        OzoneObj.StoreType.OZONE,
+        getTestVolumeName(index), null, null).build();
+  }
+
+  private OzoneObj getTestBucketobj(int volIndex, int bucketIndex) {
+    return OzoneObjInfo.Builder.newBuilder()
+        .setResType(OzoneObj.ResourceType.BUCKET)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .setVolumeName(getTestVolumeName(volIndex))
+        .setBucketName(getTestBucketName(bucketIndex)).build();
+  }
+
+  private OzoneObj getTestKeyobj(int volIndex, int bucketIndex,
+      int keyIndex) {
+    return OzoneObjInfo.Builder.newBuilder()
+        .setResType(OzoneObj.ResourceType.KEY)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .setVolumeName(getTestVolumeName(volIndex))
+        .setBucketName(getTestBucketName(bucketIndex))
+        .setKeyName(getTestKeyName(keyIndex))
+        .build();
+  }
+
+  List<IAccessAuthorizer.ACLType> getAclsToTest() {
+    return Arrays.stream(IAccessAuthorizer.ACLType.values()).filter(
+        (type)-> type != NONE).collect(Collectors.toList());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to