This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 091993b  HDDS-3056. Allow users to list volumes they have access to, 
and optionally allow all users to list all volumes (#696)
091993b is described below

commit 091993bc7186a89265286a0a5773c4b67cb19f94
Author: Siyao Meng <[email protected]>
AuthorDate: Tue Apr 21 12:42:52 2020 -0700

    HDDS-3056. Allow users to list volumes they have access to, and optionally 
allow all users to list all volumes (#696)
---
 .../common/src/main/resources/ozone-default.xml    |  10 +
 hadoop-hdds/docs/content/shell/VolumeCommands.md   |   6 +-
 .../docs/content/shell/VolumeCommands.zh.md        |   5 +-
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   3 +
 .../ozone/om/protocol/OzoneManagerProtocol.java    |   2 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |   2 +-
 .../src/main/compose/ozonesecure/docker-config     |   1 +
 .../ozone/om/TestOzoneManagerListVolumes.java      | 238 +++++++++++++++++++++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 111 +++++++---
 .../apache/hadoop/ozone/om/VolumeManagerImpl.java  |  22 +-
 .../ozone/shell/volume/ListVolumeHandler.java      |  11 +-
 11 files changed, 354 insertions(+), 57 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 04984fe..04a61cb 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -584,6 +584,16 @@
     </description>
   </property>
   <property>
+    <name>ozone.om.volume.listall.allowed</name>
+    <value>true</value>
+    <tag>OM, MANAGEMENT</tag>
+    <description>
+      Allows everyone to list all volumes when set to true. Defaults to true.
+      When set to false, non-admin users can only list the volumes they have
+      access to. Admins can always list all volumes.
+    </description>
+  </property>
+  <property>
     <name>ozone.om.user.max.volume</name>
     <value>1024</value>
     <tag>OM, MANAGEMENT</tag>
diff --git a/hadoop-hdds/docs/content/shell/VolumeCommands.md 
b/hadoop-hdds/docs/content/shell/VolumeCommands.md
index 47fb985..fe459f3 100644
--- a/hadoop-hdds/docs/content/shell/VolumeCommands.md
+++ b/hadoop-hdds/docs/content/shell/VolumeCommands.md
@@ -85,13 +85,15 @@ The above command will print out the information about hive 
volume.
 
 ### List
 
-The `volume list` command will list the volumes owned by a user.
+The `volume list` command will list the volumes accessible by a user.
 
 {{< highlight bash >}}
 ozone sh volume list --user hadoop
 {{< /highlight >}}
 
-The above command will print out all the volumes owned by the user hadoop.
+When ACL is enabled, the above command will print out volumes that the user
+hadoop has LIST permission to. When ACL is disabled, the above command will
+print out all the volumes owned by the user hadoop.
 
 ### Update
 
diff --git a/hadoop-hdds/docs/content/shell/VolumeCommands.zh.md 
b/hadoop-hdds/docs/content/shell/VolumeCommands.zh.md
index b4a4c28..190e099 100644
--- a/hadoop-hdds/docs/content/shell/VolumeCommands.zh.md
+++ b/hadoop-hdds/docs/content/shell/VolumeCommands.zh.md
@@ -80,13 +80,14 @@ ozone sh volume info /hive
 
 ### 列举
 
-`volume list` 命令用来列举一个用户拥有的所有卷。
+`volume list` 命令用来列举一个用户可以访问的所有卷。
 
 {{< highlight bash >}}
 ozone sh volume list --user hadoop
 {{< /highlight >}}
 
-上述命令会打印出 hadoop 用户拥有的所有卷。
+若 ACL 已启用,上述命令会打印出 hadoop 用户有 LIST 权限的所有卷。
+若 ACL 被禁用,上述命令会打印出 hadoop 用户拥有的所有卷。
 
 ### 更新
 
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index f46b308..7af6627 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -76,6 +76,9 @@ public final class OMConfigKeys {
       "ozone.om.db.cache.size.mb";
   public static final int OZONE_OM_DB_CACHE_SIZE_DEFAULT = 128;
 
+  public static final String OZONE_OM_VOLUME_LISTALL_ALLOWED =
+      "ozone.om.volume.listall.allowed";
+  public static final boolean OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT = true;
   public static final String OZONE_OM_USER_MAX_VOLUME =
       "ozone.om.user.max.volume";
   public static final int OZONE_OM_USER_MAX_VOLUME_DEFAULT = 1024;
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index ff31773..34943cb 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -118,7 +118,7 @@ public interface OzoneManagerProtocol
   void deleteVolume(String volume) throws IOException;
 
   /**
-   * Lists volume owned by a specific user.
+   * Lists volumes accessible by a specific user.
    * @param userName - user name
    * @param prefix  - Filter prefix -- Return only entries that match this.
    * @param prevKey - Previous key -- List starts from the next from the 
prevkey
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index a95d45e..674f672 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -573,7 +573,7 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
   }
 
   /**
-   * Lists volume owned by a specific user.
+   * Lists volumes accessible by a specific user.
    *
    * @param userName - user name
    * @param prefix - Filter prefix -- Return only entries that match this.
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
index df34dd7..6184b42 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 CORE-SITE.XML_fs.o3fs.impl=org.apache.hadoop.fs.ozone.OzoneFileSystem
+OZONE-SITE.XML_ozone.om.volume.listall.allowed=false
 
 OZONE-SITE.XML_ozone.om.address=om
 OZONE-SITE.XML_ozone.om.http-address=om:9874
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
new file mode 100644
index 0000000..d5b0efc
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+/**
+ * Test OzoneManager list volume operation under combinations of configs.
+ */
+public class TestOzoneManagerListVolumes {
+
+  @Rule
+  public Timeout timeout = new Timeout(120_000);
+
+  private UserGroupInformation loginUser;
+  private UserGroupInformation user1 =
+      UserGroupInformation.createRemoteUser("user1");  // Admin user
+  private UserGroupInformation user2 =
+      UserGroupInformation.createRemoteUser("user2");  // Non-admin user
+
+  @Before
+  public void init() throws Exception {
+    loginUser = UserGroupInformation.getLoginUser();
+  }
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   */
+  private MiniOzoneCluster startCluster(boolean aclEnabled,
+      boolean volListAllAllowed) throws Exception {
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    String clusterId = UUID.randomUUID().toString();
+    String scmId = UUID.randomUUID().toString();
+    String omId = UUID.randomUUID().toString();
+    conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
+    conf.set(OZONE_ADMINISTRATORS, "user1");
+    conf.setInt(OZONE_SCM_RATIS_PIPELINE_LIMIT, 10);
+
+    // Use native impl here, default impl doesn't do actual checks
+    conf.set(OZONE_ACL_AUTHORIZER_CLASS, OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
+    // Note: OM doesn't support live config reloading
+    conf.setBoolean(OZONE_ACL_ENABLED, aclEnabled);
+    conf.setBoolean(OZONE_OM_VOLUME_LISTALL_ALLOWED, volListAllAllowed);
+
+    MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
+        .setClusterId(clusterId).setScmId(scmId).setOmId(omId).build();
+    cluster.waitForClusterToBeReady();
+
+    // loginUser is the user running this test.
+    // Implication: loginUser is automatically added to the OM admin list.
+    UserGroupInformation.setLoginUser(loginUser);
+    // Create volumes with non-default owners and ACLs
+    OzoneClient client = cluster.getClient();
+    ObjectStore objectStore = client.getObjectStore();
+
+    /* r = READ, w = WRITE, c = CREATE, d = DELETE
+       l = LIST, a = ALL, n = NONE, x = READ_ACL, y = WRITE_ACL */
+    String aclUser1All = "user:user1:a";
+    String aclUser2All = "user:user2:a";
+    String aclWorldAll = "world::a";
+    createVolumeWithOwnerAndAcl(objectStore, "volume1", "user1", aclUser1All);
+    createVolumeWithOwnerAndAcl(objectStore, "volume2", "user2", aclUser2All);
+    createVolumeWithOwnerAndAcl(objectStore, "volume3", "user1", aclUser2All);
+    createVolumeWithOwnerAndAcl(objectStore, "volume4", "user2", aclUser1All);
+    createVolumeWithOwnerAndAcl(objectStore, "volume5", "user1", aclWorldAll);
+
+    return cluster;
+  }
+
+  private void stopCluster(MiniOzoneCluster cluster) {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private void createVolumeWithOwnerAndAcl(ObjectStore objectStore,
+      String volumeName, String ownerName, String aclString)
+      throws IOException {
+    ClientProtocol proxy = objectStore.getClientProxy();
+    objectStore.createVolume(volumeName);
+    proxy.setVolumeOwner(volumeName, ownerName);
+    setVolumeAcl(objectStore, volumeName, aclString);
+  }
+
+  /**
+   * Helper function to set volume ACL.
+   */
+  private void setVolumeAcl(ObjectStore objectStore, String volumeName,
+      String aclString) throws IOException {
+    OzoneObj obj = OzoneObjInfo.Builder.newBuilder().setVolumeName(volumeName)
+        .setResType(OzoneObj.ResourceType.VOLUME).setStoreType(OZONE).build();
+    Assert.assertTrue(objectStore.setAcl(obj, OzoneAcl.parseAcls(aclString)));
+  }
+
+  /**
+   * Helper function to reduce code redundancy for test checks with each user
+   * under different config combination.
+   */
+  private void checkUser(MiniOzoneCluster cluster, UserGroupInformation user,
+      List<String> expectVol, boolean expectListAllSuccess) throws IOException 
{
+
+    UserGroupInformation.setLoginUser(user);
+    OzoneClient client = cluster.getClient();
+    ObjectStore objectStore = client.getObjectStore();
+
+    // `ozone sh volume list` shall return volumes with LIST permission of 
user.
+    Iterator<? extends OzoneVolume> it = objectStore.listVolumesByUser(
+        null, "", "");
+    Set<String> accessibleVolumes = new HashSet<>();
+    while (it.hasNext()) {
+      OzoneVolume vol = it.next();
+      String volumeName = vol.getName();
+      accessibleVolumes.add(volumeName);
+    }
+    Assert.assertEquals(new HashSet<>(expectVol), accessibleVolumes);
+
+    // `ozone sh volume list --all` returns all volumes,
+    //  or throws exception (for non-admin if acl enabled & listall 
disallowed).
+    if (expectListAllSuccess) {
+      it = objectStore.listVolumes("volume");
+      int count = 0;
+      while (it.hasNext()) {
+        it.next();
+        count++;
+      }
+      Assert.assertEquals(5, count);
+    } else {
+      try {
+        objectStore.listVolumes("volume");
+        Assert.fail("listAllVolumes should fail for " + user.getUserName());
+      } catch (RuntimeException ex) {
+        // Current listAllVolumes throws RuntimeException
+        if (ex.getCause() instanceof OMException) {
+          // Expect PERMISSION_DENIED
+          if (((OMException) ex.getCause()).getResult() !=
+              OMException.ResultCodes.PERMISSION_DENIED) {
+            throw ex;
+          }
+        } else {
+          throw ex;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testAclEnabledListAllAllowed() throws Exception {
+    // ozone.acl.enabled = true, ozone.om.volume.listall.allowed = true
+    MiniOzoneCluster cluster = startCluster(true, true);
+    checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
+        true);
+    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
+        true);
+    stopCluster(cluster);
+  }
+
+  @Test
+  public void testAclEnabledListAllDisallowed() throws Exception {
+    // ozone.acl.enabled = true, ozone.om.volume.listall.allowed = false
+    MiniOzoneCluster cluster = startCluster(true, false);
+    checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
+        true);
+    checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
+        false);
+    stopCluster(cluster);
+  }
+
+  @Test
+  public void testAclDisabledListAllAllowed() throws Exception {
+    // ozone.acl.enabled = false, ozone.om.volume.listall.allowed = true
+    MiniOzoneCluster cluster = startCluster(false, true);
+    checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume5"),
+        true);
+    checkUser(cluster, user2, Arrays.asList("volume2", "volume4"),
+        true);
+    stopCluster(cluster);
+  }
+
+  @Test
+  public void testAclDisabledListAllDisallowed() throws Exception {
+    // ozone.acl.enabled = false, ozone.om.volume.listall.allowed = false
+    MiniOzoneCluster cluster = startCluster(false, false);
+    checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume5"),
+        true);
+    checkUser(cluster, user2, Arrays.asList("volume2", "volume4"),
+        true);  // listall will succeed since acl is disabled
+    stopCluster(cluster);
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 67606bf..217d22c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -202,6 +202,8 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_F
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
@@ -295,6 +297,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private static String keyProviderUriKeyName =
       CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
 
+  private boolean allowListAllVolumes;
   // Adding parameters needed for VolumeRequests here, so that during request
   // execution, we can get from ozoneManager.
   private long maxUserVolumeCount;
@@ -341,6 +344,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
     loginOMUserIfSecurityEnabled(conf);
 
+    this.allowListAllVolumes = conf.getBoolean(OZONE_OM_VOLUME_LISTALL_ALLOWED,
+        OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT);
     this.maxUserVolumeCount = conf.getInt(OZONE_OM_USER_MAX_VOLUME,
         OZONE_OM_USER_MAX_VOLUME_DEFAULT);
     Preconditions.checkArgument(this.maxUserVolumeCount > 0,
@@ -421,11 +426,9 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     }
 
     instantiateServices();
-
     this.omRatisSnapshotInfo = new OMRatisSnapshotInfo(
         omStorage.getCurrentDir());
     initializeRatisServer();
-
     if (isRatisEnabled) {
       // Create Ratis storage dir
       String omRatisDirectory =
@@ -446,7 +449,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     }
 
     metrics = OMMetrics.create();
-
     omClientProtocolMetrics = ProtocolMessageMetrics
         .create("OmClientProtocol", "Ozone Manager RPC endpoint",
             OzoneManagerProtocolProtos.Type.values());
@@ -1595,7 +1597,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    * @param vol     - name of volume
    * @param bucket  - bucket name
    * @param key     - key
-   * @throws OMException
+   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied.
    */
   private void checkAcls(ResourceType resType, StoreType store,
       ACLType acl, String vol, String bucket, String key)
@@ -1603,27 +1605,54 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     checkAcls(resType, store, acl, vol, bucket, key,
         ProtobufRpcEngine.Server.getRemoteUser(),
         ProtobufRpcEngine.Server.getRemoteIp(),
-        ProtobufRpcEngine.Server.getRemoteIp().getHostName());
+        ProtobufRpcEngine.Server.getRemoteIp().getHostName(),
+        true);
+  }
+
+  /**
+   * A variant of checkAcls that doesn't throw exception if permission denied.
+   * @return true if permission granted, false if permission denied.
+   */
+  private boolean hasAcls(ResourceType resType, StoreType store,
+      ACLType acl, String vol, String bucket, String key) {
+    try {
+      return checkAcls(resType, store, acl, vol, bucket, key,
+          ProtobufRpcEngine.Server.getRemoteUser(),
+          ProtobufRpcEngine.Server.getRemoteIp(),
+          ProtobufRpcEngine.Server.getRemoteIp().getHostName(),
+          false);
+    } catch (OMException ex) {
+      // Should not trigger exception here at all
+      return false;
+    }
   }
 
+
   /**
    * CheckAcls for the ozone object.
-   * @param resType
-   * @param storeType
-   * @param aclType
-   * @param vol
-   * @param bucket
-   * @param key
-   * @param ugi
-   * @param remoteAddress
-   * @param hostName
-   * @throws OMException
+   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied.
    */
   @SuppressWarnings("parameternumber")
   public void checkAcls(ResourceType resType, StoreType storeType,
       ACLType aclType, String vol, String bucket, String key,
       UserGroupInformation ugi, InetAddress remoteAddress, String hostName)
       throws OMException {
+    checkAcls(resType, storeType, aclType, vol, bucket, key,
+        ugi, remoteAddress, hostName, true);
+  }
+
+  /**
+   * CheckAcls for the ozone object.
+   * @return true if permission granted, false if permission denied.
+   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied
+   *                     and throwOnPermissionDenied set to true.
+   */
+  @SuppressWarnings("parameternumber")
+  private boolean checkAcls(ResourceType resType, StoreType storeType,
+      ACLType aclType, String vol, String bucket, String key,
+      UserGroupInformation ugi, InetAddress remoteAddress, String hostName,
+      boolean throwIfPermissionDenied)
+      throws OMException {
     OzoneObj obj = OzoneObjInfo.Builder.newBuilder()
         .setResType(resType)
         .setStoreType(storeType)
@@ -1638,11 +1667,16 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
         .setAclRights(aclType)
         .build();
     if (!accessAuthorizer.checkAccess(obj, context)) {
-      LOG.warn("User {} doesn't have {} permission to access {} /{}/{}/{}",
-          ugi.getUserName(), aclType, resType, vol, bucket, key);
-      throw new OMException("User " + ugi.getUserName() + " doesn't " +
-          "have " + aclType + " permission to access " + resType,
-          ResultCodes.PERMISSION_DENIED);
+      if (throwIfPermissionDenied) {
+        LOG.warn("User {} doesn't have {} permission to access {} /{}/{}/{}",
+            ugi.getUserName(), aclType, resType, vol, bucket, key);
+        throw new OMException("User " + ugi.getUserName() + " doesn't have " +
+            aclType + " permission to access " + resType,
+            ResultCodes.PERMISSION_DENIED);
+      }
+      return false;
+    } else {
+      return true;
     }
   }
 
@@ -1725,7 +1759,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   @Override
   public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
       throws IOException {
-    if(isAclEnabled) {
+    if (isAclEnabled) {
       checkAcls(ResourceType.VOLUME, StoreType.OZONE,
           ACLType.READ, volume, null, null);
     }
@@ -1810,7 +1844,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   }
 
   /**
-   * Lists volume owned by a specific user.
+   * Lists volumes accessible by a specific user.
    *
    * @param userName - user name
    * @param prefix - Filter prefix -- Return only entries that match this.
@@ -1823,13 +1857,13 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   @Override
   public List<OmVolumeArgs> listVolumeByUser(String userName, String prefix,
       String prevKey, int maxKeys) throws IOException {
-    if(isAclEnabled) {
-      UserGroupInformation remoteUserUgi = ProtobufRpcEngine.Server.
-          getRemoteUser();
+    UserGroupInformation remoteUserUgi =
+        ProtobufRpcEngine.Server.getRemoteUser();
+    if (isAclEnabled) {
       if (remoteUserUgi == null) {
         LOG.error("Rpc user UGI is null. Authorization failed.");
-        throw new OMException("Rpc user UGI is null. Authorization " +
-            "failed.", ResultCodes.PERMISSION_DENIED);
+        throw new OMException("Rpc user UGI is null. Authorization failed.",
+            ResultCodes.PERMISSION_DENIED);
       }
     }
     boolean auditSuccess = true;
@@ -1840,7 +1874,23 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     auditMap.put(OzoneConsts.USERNAME, userName);
     try {
       metrics.incNumVolumeLists();
-      return volumeManager.listVolumes(userName, prefix, prevKey, maxKeys);
+      if (isAclEnabled) {
+        // List all volumes first
+        List<OmVolumeArgs> listAllVolumes = volumeManager.listVolumes(
+            null, prefix, prevKey, maxKeys);
+        List<OmVolumeArgs> result = new ArrayList<>();
+        // Filter all volumes by LIST ACL
+        for (OmVolumeArgs volumeArgs : listAllVolumes) {
+          if (hasAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.LIST,
+              volumeArgs.getVolume(), null, null)) {
+            result.add(volumeArgs);
+          }
+        }
+        return result;
+      } else {
+        // When ACL is not enabled, fallback to filter by owner
+        return volumeManager.listVolumes(userName, prefix, prevKey, maxKeys);
+      }
     } catch (Exception ex) {
       metrics.incNumVolumeListFails();
       auditSuccess = false;
@@ -1876,7 +1926,10 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     auditMap.put(OzoneConsts.USERNAME, null);
     try {
       metrics.incNumVolumeLists();
-      checkAdmin();
+      if (!allowListAllVolumes) {
+        // Only admin can list all volumes when disallowed in config
+        checkAdmin();
+      }
       return volumeManager.listVolumes(null, prefix, prevKey, maxKeys);
     } catch (Exception ex) {
       metrics.incNumVolumeListFails();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
index 9d9cdab..f7d730a 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
@@ -20,10 +20,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -32,10 +30,8 @@ import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.UserVolumeInfo;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 
 import com.google.common.base.Preconditions;
@@ -453,23 +449,11 @@ public class VolumeManagerImpl implements VolumeManager {
   @Override
   public List<OmVolumeArgs> listVolumes(String userName,
       String prefix, String startKey, int maxKeys) throws IOException {
-    metadataManager.getLock().acquireLock(USER_LOCK, userName);
+    metadataManager.getLock().acquireReadLock(USER_LOCK, userName);
     try {
-      List<OmVolumeArgs> volumes = metadataManager.listVolumes(
-          userName, prefix, startKey, maxKeys);
-      UserGroupInformation userUgi = ProtobufRpcEngine.Server.
-          getRemoteUser();
-      if (userUgi == null || !aclEnabled) {
-        return volumes;
-      }
-
-      List<OmVolumeArgs> filteredVolumes = volumes.stream().
-          filter(v -> v.getAclMap().
-              hasAccess(IAccessAuthorizer.ACLType.LIST, userUgi))
-          .collect(Collectors.toList());
-      return filteredVolumes;
+      return metadataManager.listVolumes(userName, prefix, startKey, maxKeys);
     } finally {
-      metadataManager.getLock().releaseLock(USER_LOCK, userName);
+      metadataManager.getLock().releaseReadLock(USER_LOCK, userName);
     }
   }
 
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/volume/ListVolumeHandler.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/volume/ListVolumeHandler.java
index 78f0e4f..fb0760e 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/volume/ListVolumeHandler.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/volume/ListVolumeHandler.java
@@ -52,9 +52,14 @@ public class ListVolumeHandler extends Handler {
   private ListOptions listOptions;
 
   @Option(names = {"--user", "-u"},
-      description = "Owner of the volumes to list.")
+      description = "List accessible volumes of the user. This will be ignored"
+          + " if list all volumes option is specified.")
   private String userName;
 
+  @Option(names = {"--all", "-a"},
+      description = "List all volumes.")
+  private boolean listAllVolumes;
+
   @Override
   protected OzoneAddress getAddress() throws OzoneClientException {
     OzoneAddress address = new OzoneAddress(uri);
@@ -71,12 +76,12 @@ public class ListVolumeHandler extends Handler {
     }
 
     Iterator<? extends OzoneVolume> volumeIterator;
-    if (userName != null) {
+    if (userName != null && !listAllVolumes) {
       volumeIterator = client.getObjectStore().listVolumesByUser(userName,
           listOptions.getPrefix(), listOptions.getStartItem());
     } else {
       volumeIterator = client.getObjectStore().listVolumes(
-          listOptions.getPrefix());
+          listOptions.getPrefix(), listOptions.getStartItem());
     }
 
     int counter = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to