This is an automated email from the ASF dual-hosted git repository.
xyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 6112603 HDDS-4088. Adding Owner info for Authorizer plugin to honor
owner access rights (#1395)
6112603 is described below
commit 6112603aca864bf18243fcfceb1a330ae7ac0587
Author: Xiaoyu Yao <[email protected]>
AuthorDate: Mon Oct 26 10:56:19 2020 -0700
HDDS-4088. Adding Owner info for Authorizer plugin to honor owner access
rights (#1395)
---
.../hadoop/ozone/security/acl/OzoneObjInfo.java | 10 +
.../hadoop/ozone/security/acl/RequestContext.java | 48 +++-
.../org/apache/hadoop/ozone/om/TestOmAcls.java | 9 +-
.../ozone/om/TestOzoneManagerListVolumes.java | 36 +--
.../org/apache/hadoop/ozone/om/OzoneManager.java | 72 +++--
.../hadoop/ozone/om/request/OMClientRequest.java | 26 +-
.../hadoop/ozone/om/request/key/OMKeyRequest.java | 34 +++
.../ozone/om/request/key/OMKeysDeleteRequest.java | 4 +-
.../ozone/om/request/key/OMKeysRenameRequest.java | 9 +-
.../request/key/acl/prefix/OMPrefixAclRequest.java | 7 +
.../hadoop/ozone/om/request/util/ObjectParser.java | 6 +-
.../ozone/security/acl/OzoneNativeAuthorizer.java | 44 ++-
.../hadoop/ozone/security/acl/TestVolumeOwner.java | 298 +++++++++++++++++++++
13 files changed, 552 insertions(+), 51 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
index cbae18c..42ddbb9 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
@@ -159,6 +159,16 @@ public final class OzoneObjInfo extends OzoneObj {
return new Builder();
}
+ public static Builder getBuilder(ResourceType resType,
+ StoreType storeType, String vol, String bucket, String key) {
+ return OzoneObjInfo.Builder.newBuilder()
+ .setResType(resType)
+ .setStoreType(storeType)
+ .setVolumeName(vol)
+ .setBucketName(bucket)
+ .setKeyName(key);
+ }
+
public static Builder fromKeyArgs(OmKeyArgs args) {
return new Builder()
.setVolumeName(args.getVolumeName())
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/RequestContext.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/RequestContext.java
index 3295827..043cd55 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/RequestContext.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/RequestContext.java
@@ -16,6 +16,7 @@
*/
package org.apache.hadoop.ozone.security.acl;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
@@ -32,16 +33,20 @@ public class RequestContext {
private final String serviceId;
private final ACLIdentityType aclType;
private final ACLType aclRights;
+ private final String ownerName;
+ @SuppressWarnings("parameternumber")
public RequestContext(String host, InetAddress ip,
UserGroupInformation clientUgi, String serviceId,
- ACLIdentityType aclType, ACLType aclRights) {
+ ACLIdentityType aclType, ACLType aclRights,
+ String ownerName) {
this.host = host;
this.ip = ip;
this.clientUgi = clientUgi;
this.serviceId = serviceId;
this.aclType = aclType;
this.aclRights = aclRights;
+ this.ownerName = ownerName;
}
/**
@@ -55,6 +60,12 @@ public class RequestContext {
private IAccessAuthorizer.ACLIdentityType aclType;
private IAccessAuthorizer.ACLType aclRights;
+ /**
+ * ownerName is specially added to allow
+ * authorizer to honor owner privilege.
+ */
+ private String ownerName;
+
public Builder setHost(String bHost) {
this.host = bHost;
return this;
@@ -80,14 +91,23 @@ public class RequestContext {
return this;
}
+ public ACLType getAclRights() {
+ return this.aclRights;
+ }
+
public Builder setAclRights(ACLType aclRight) {
this.aclRights = aclRight;
return this;
}
+ public Builder setOwnerName(String owner) {
+ this.ownerName = owner;
+ return this;
+ }
+
public RequestContext build() {
return new RequestContext(host, ip, clientUgi, serviceId, aclType,
- aclRights);
+ aclRights, ownerName);
}
}
@@ -95,6 +115,27 @@ public class RequestContext {
return new Builder();
}
+ public static RequestContext.Builder getBuilder(
+ UserGroupInformation ugi, InetAddress remoteAddress, String hostName,
+ ACLType aclType, String ownerName) {
+ RequestContext.Builder contextBuilder = RequestContext.newBuilder()
+ .setClientUgi(ugi)
+ .setIp(remoteAddress)
+ .setHost(hostName)
+ .setAclType(ACLIdentityType.USER)
+ .setAclRights(aclType)
+ .setOwnerName(ownerName);
+ return contextBuilder;
+ }
+
+ public static RequestContext.Builder getBuilder(UserGroupInformation ugi,
+ ACLType aclType, String ownerName) {
+ return getBuilder(ugi,
+ ProtobufRpcEngine.Server.getRemoteIp(),
+ ProtobufRpcEngine.Server.getRemoteIp().getHostName(),
+ aclType, ownerName);
+ }
+
public String getHost() {
return host;
}
@@ -119,4 +160,7 @@ public class RequestContext {
return aclRights;
}
+ public String getOwnerName() {
+ return ownerName;
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
index 34303ff..271109f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
@@ -116,7 +116,14 @@ public class TestOmAcls {
String volumeName = RandomStringUtils.randomAlphabetic(5).toLowerCase();
String bucketName = RandomStringUtils.randomAlphabetic(5).toLowerCase();
- cluster.getClient().getObjectStore().createVolume(volumeName);
+
+ VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+ .setOwner("user" + RandomStringUtils.randomNumeric(5))
+ .setAdmin("admin" + RandomStringUtils.randomNumeric(5))
+ .build();
+
+ cluster.getClient().getObjectStore().createVolume(volumeName,
+ createVolumeArgs);
OzoneVolume volume =
cluster.getClient().getObjectStore().getVolume(volumeName);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
index d7aaf37..a47aa08 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerListVolumes.java
@@ -201,16 +201,16 @@ public class TestOzoneManagerListVolumes {
// Login as user1, list other users' volumes
UserGroupInformation.setLoginUser(user1);
- checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
- true);
+ checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+ "volume5"), true);
// Add "s3v" created default by OM.
checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2",
"volume3",
"volume4", "volume5", "s3v"), true);
UserGroupInformation.setLoginUser(user2);
- checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
- true);
+ checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume4",
+ "volume5"), true);
checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2",
"volume3",
"volume4", "volume5", "s3v"), true);
@@ -229,18 +229,18 @@ public class TestOzoneManagerListVolumes {
// Login as user1, list other users' volumes, expect failure
UserGroupInformation.setLoginUser(user1);
- checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
- false);
+ checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+ "volume5"), false);
// Add "s3v" created default by OM.
checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2",
"volume3",
"volume4", "volume5", "s3v"), false);
// While admin should be able to list volumes just fine.
UserGroupInformation.setLoginUser(adminUser);
- checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
- true);
- checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
- true);
+ checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume4",
+ "volume5"), true);
+ checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+ "volume5"), true);
stopCluster(cluster);
}
@@ -249,10 +249,10 @@ public class TestOzoneManagerListVolumes {
public void testAclEnabledListAllAllowed() throws Exception {
// ozone.acl.enabled = true, ozone.om.volume.listall.allowed = true
MiniOzoneCluster cluster = startCluster(true, true);
- checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
- true);
- checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
- true);
+ checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume4",
+ "volume5"), true);
+ checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+ "volume5"), true);
// Add "s3v" created default by OM.
checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2",
"volume3",
@@ -267,11 +267,11 @@ public class TestOzoneManagerListVolumes {
// The default user is adminUser as set in init(),
// listall always succeeds if we use that UGI, we should use non-admin here
UserGroupInformation.setLoginUser(user1);
- checkUser(cluster, user1, Arrays.asList("volume1", "volume4", "volume5"),
- false);
+ checkUser(cluster, user1, Arrays.asList("volume1", "volume3", "volume4",
+ "volume5"), false);
UserGroupInformation.setLoginUser(user2);
- checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume5"),
- false);
+ checkUser(cluster, user2, Arrays.asList("volume2", "volume3", "volume4",
+ "volume5"), false);
UserGroupInformation.setLoginUser(adminUser);
// Add "s3v" created default by OM.
checkUser(cluster, adminUser, Arrays.asList("volume1", "volume2",
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index e58af8b..258564c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -226,6 +226,7 @@ import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVA
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService.newReflectiveBlockingService;
import org.apache.hadoop.util.Time;
@@ -528,6 +529,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
authorizer.setKeyManager(keyManager);
authorizer.setPrefixManager(prefixManager);
authorizer.setOzoneAdmins(getOzoneAdmins(configuration));
+ authorizer.setAllowListAllVolumes(allowListAllVolumes);
}
} else {
accessAuthorizer = null;
@@ -1627,7 +1629,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
ProtobufRpcEngine.Server.getRemoteUser(),
ProtobufRpcEngine.Server.getRemoteIp(),
ProtobufRpcEngine.Server.getRemoteIp().getHostName(),
- true);
+ true, getVolumeOwner(vol, acl));
}
/**
@@ -1642,25 +1644,46 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
UserGroupInformation.createRemoteUser(userName),
ProtobufRpcEngine.Server.getRemoteIp(),
ProtobufRpcEngine.Server.getRemoteIp().getHostName(),
- false);
+ false, getVolumeOwner(vol, acl));
} catch (OMException ex) {
// Should not trigger exception here at all
return false;
}
}
- /**
- * CheckAcls for the ozone object.
- *
- * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied.
- */
- @SuppressWarnings("parameternumber")
- public void checkAcls(ResourceType resType, StoreType storeType,
- ACLType aclType, String vol, String bucket, String key,
- UserGroupInformation ugi, InetAddress remoteAddress, String hostName)
- throws OMException {
- checkAcls(resType, storeType, aclType, vol, bucket, key,
- ugi, remoteAddress, hostName, true);
+ public String getVolumeOwner(String vol, ACLType type) throws OMException {
+ String volOwnerName = null;
+ if (!vol.equals(OzoneConsts.OZONE_ROOT) && (type != ACLType.CREATE)) {
+ volOwnerName = getVolumeOwner(vol);
+ }
+ return volOwnerName;
+ }
+
+ private String getVolumeOwner(String volume) throws OMException {
+ Boolean lockAcquired = metadataManager.getLock().acquireReadLock(
+ VOLUME_LOCK, volume);
+ String dbVolumeKey = metadataManager.getVolumeKey(volume);
+ OmVolumeArgs volumeArgs = null;
+ try {
+ volumeArgs = metadataManager.getVolumeTable().get(dbVolumeKey);
+ } catch (IOException ioe) {
+ if (ioe instanceof OMException) {
+ throw (OMException)ioe;
+ } else {
+ throw new OMException("getVolumeOwner for Volume " + volume + "
failed",
+ ResultCodes.INTERNAL_ERROR);
+ }
+ } finally {
+ if (lockAcquired) {
+ metadataManager.getLock().releaseReadLock(VOLUME_LOCK, volume);
+ }
+ }
+ if (volumeArgs != null) {
+ return volumeArgs.getOwnerName();
+ } else {
+ throw new OMException("Volume " + volume + " is not found",
+ OMException.ResultCodes.VOLUME_NOT_FOUND);
+ }
}
/**
@@ -1671,10 +1694,10 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
* and throwOnPermissionDenied set to true.
*/
@SuppressWarnings("parameternumber")
- private boolean checkAcls(ResourceType resType, StoreType storeType,
+ public boolean checkAcls(ResourceType resType, StoreType storeType,
ACLType aclType, String vol, String bucket, String key,
UserGroupInformation ugi, InetAddress remoteAddress, String hostName,
- boolean throwIfPermissionDenied)
+ boolean throwIfPermissionDenied, String volumeOwner)
throws OMException {
OzoneObj obj = OzoneObjInfo.Builder.newBuilder()
.setResType(resType)
@@ -1688,13 +1711,17 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
.setHost(hostName)
.setAclType(ACLIdentityType.USER)
.setAclRights(aclType)
+ .setOwnerName(volumeOwner)
.build();
if (!accessAuthorizer.checkAccess(obj, context)) {
if (throwIfPermissionDenied) {
LOG.warn("User {} doesn't have {} permission to access {} /{}/{}/{}",
- ugi.getUserName(), aclType, resType, vol, bucket, key);
- throw new OMException("User " + ugi.getUserName() + " doesn't have " +
- aclType + " permission to access " + resType,
+ context.getClientUgi().getUserName(), context.getAclRights(),
+ obj.getResourceType(), obj.getVolumeName(), obj.getBucketName(),
+ obj.getKeyName());
+ throw new OMException("User " + context.getClientUgi().getUserName() +
+ " doesn't have " + context.getAclRights() +
+ " permission to access " + obj.getResourceType(),
ResultCodes.PERMISSION_DENIED);
}
return false;
@@ -3497,7 +3524,6 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
return new ResolvedBucket(requested, resolved);
}
-
public ResolvedBucket resolveBucketLink(Pair<String, String> requested)
throws IOException {
@@ -3548,9 +3574,11 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
}
if (isAclEnabled) {
- checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.READ,
+ final ACLType type = ACLType.READ;
+ checkAcls(ResourceType.BUCKET, StoreType.OZONE, type,
volumeName, bucketName, null, userGroupInformation,
- remoteAddress, hostName);
+ remoteAddress, hostName, true,
+ getVolumeOwner(volumeName, type));
}
return resolveBucketLink(
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index 728a624..674ee08 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@ -148,12 +148,36 @@ public abstract class OMClientRequest implements
RequestAuditor {
* @param key
* @throws IOException
*/
+ @SuppressWarnings("parameternumber")
public void checkAcls(OzoneManager ozoneManager,
OzoneObj.ResourceType resType,
OzoneObj.StoreType storeType, IAccessAuthorizer.ACLType aclType,
String vol, String bucket, String key) throws IOException {
+ checkAcls(ozoneManager, resType, storeType, aclType, vol, bucket, key,
+ ozoneManager.getVolumeOwner(vol, aclType));
+ }
+
+ /**
+ * Check Acls of ozone object with volOwner given.
+ * @param ozoneManager
+ * @param resType
+ * @param storeType
+ * @param aclType
+ * @param vol
+ * @param bucket
+ * @param key
+ * @param volOwner
+ * @throws IOException
+ */
+ @SuppressWarnings("parameternumber")
+ public void checkAcls(OzoneManager ozoneManager,
+ OzoneObj.ResourceType resType,
+ OzoneObj.StoreType storeType, IAccessAuthorizer.ACLType aclType,
+ String vol, String bucket, String key, String volOwner)
+ throws IOException {
ozoneManager.checkAcls(resType, storeType, aclType, vol, bucket, key,
- createUGI(), getRemoteAddress(), getHostName());
+ createUGI(), getRemoteAddress(), getHostName(), true,
+ volOwner);
}
/**
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index a048533..ee48f9b 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -198,6 +198,19 @@ public abstract class OMKeyRequest extends OMClientRequest
{
}
}
+ // For keys batch delete and rename only
+ protected String getVolumeOwner(OMMetadataManager omMetadataManager,
+ String volumeName) throws IOException {
+ String dbVolumeKey = omMetadataManager.getVolumeKey(volumeName);
+ OmVolumeArgs volumeArgs =
+ omMetadataManager.getVolumeTable().get(dbVolumeKey);
+ if (volumeArgs == null) {
+ throw new OMException("Volume not found " + volumeName,
+ VOLUME_NOT_FOUND);
+ }
+ return volumeArgs.getOwnerName();
+ }
+
protected static Optional<FileEncryptionInfo> getFileEncryptionInfo(
OzoneManager ozoneManager, OmBucketInfo bucketInfo) throws IOException {
Optional<FileEncryptionInfo> encInfo = Optional.absent();
@@ -437,6 +450,27 @@ public abstract class OMKeyRequest extends OMClientRequest
{
}
/**
+ * Check Acls for the ozone key with volumeOwner.
+ * @param ozoneManager
+ * @param volume
+ * @param bucket
+ * @param key
+ * @param aclType
+ * @param resourceType
+ * @throws IOException
+ */
+ @SuppressWarnings("parameternumber")
+ protected void checkKeyAcls(OzoneManager ozoneManager, String volume,
+ String bucket, String key, IAccessAuthorizer.ACLType aclType,
+ OzoneObj.ResourceType resourceType, String volumeOwner)
+ throws IOException {
+ if (ozoneManager.getAclsEnabled()) {
+ checkAcls(ozoneManager, resourceType, OzoneObj.StoreType.OZONE, aclType,
+ volume, bucket, key, volumeOwner);
+ }
+ }
+
+ /**
* Check ACLs for Ozone Key in OpenKey table
* if ozone native authorizer is enabled.
* @param ozoneManager
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
index c6e7b9b..71e15f5 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
@@ -124,6 +124,7 @@ public class OMKeysDeleteRequest extends OMKeyRequest {
volumeName, bucketName);
// Validate bucket and volume exists or not.
validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+ String volumeOwner = getVolumeOwner(omMetadataManager, volumeName);
for (indexFailed = 0; indexFailed < length; indexFailed++) {
String keyName = deleteKeyArgs.getKeys(indexFailed);
@@ -143,7 +144,8 @@ public class OMKeysDeleteRequest extends OMKeyRequest {
try {
// check Acl
checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
- IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+ IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY,
+ volumeOwner);
omKeyInfoList.add(omKeyInfo);
} catch (Exception ex) {
deleteStatus = false;
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
index abaa4ae..556c6f5 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
@@ -117,6 +117,9 @@ public class OMKeysRenameRequest extends OMKeyRequest {
omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
volumeName, bucketName);
+ // Validate bucket and volume exists or not.
+ validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+ String volumeOwner = getVolumeOwner(omMetadataManager, volumeName);
for (RenameKeysMap renameKey : renameKeysArgs.getRenameKeysMapList()) {
fromKeyName = renameKey.getFromKeyName();
@@ -137,9 +140,11 @@ public class OMKeysRenameRequest extends OMKeyRequest {
// check Acls to see if user has access to perform delete operation
// on old key and create operation on new key
checkKeyAcls(ozoneManager, volumeName, bucketName, fromKeyName,
- IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+ IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY,
+ volumeOwner);
checkKeyAcls(ozoneManager, volumeName, bucketName, toKeyName,
- IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
+ IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY,
+ volumeOwner);
} catch (Exception ex) {
renameStatus = false;
unRenamedKeys.add(
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAclRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAclRequest.java
index e928402..6fbd7d2 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAclRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAclRequest.java
@@ -29,7 +29,9 @@ import
org.apache.hadoop.ozone.om.PrefixManagerImpl.OMPrefixAclOpResult;
import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.ObjectParser;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
@@ -71,6 +73,11 @@ public abstract class OMPrefixAclRequest extends
OMClientRequest {
(PrefixManagerImpl) ozoneManager.getPrefixManager();
try {
String prefixPath = getOzoneObj().getPath();
+ ObjectParser objectParser = new ObjectParser(prefixPath,
+ OzoneManagerProtocolProtos.OzoneObj.ObjectType.PREFIX);
+ volume = objectParser.getVolume();
+ bucket = objectParser.getBucket();
+ key = objectParser.getKey();
// check Acl
if (ozoneManager.getAclsEnabled()) {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java
index c12cdac..9b82702 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/util/ObjectParser.java
@@ -43,7 +43,6 @@ public class ObjectParser {
public ObjectParser(String path, ObjectType objectType) throws OMException {
Preconditions.checkNotNull(path);
String[] tokens = StringUtils.split(path, OZONE_URI_DELIMITER, 3);
-
if (objectType == ObjectType.VOLUME && tokens.length == 1) {
volume = tokens[0];
} else if (objectType == ObjectType.BUCKET && tokens.length == 2) {
@@ -53,6 +52,11 @@ public class ObjectParser {
volume = tokens[0];
bucket = tokens[1];
key = tokens[2];
+ } else if (objectType == ObjectType.PREFIX && tokens.length >= 1) {
+ volume = tokens[0];
+ if (tokens.length >= 2) {
+ bucket = tokens[1];
+ }
} else {
throw new OMException("Illegal path " + path,
OMException.ResultCodes.INVALID_PATH_IN_ACL_REQUEST);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
index df98e20..aebefdc 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
@@ -50,6 +50,7 @@ public class OzoneNativeAuthorizer implements
IAccessAuthorizer {
private KeyManager keyManager;
private PrefixManager prefixManager;
private Collection<String> ozAdmins;
+ private boolean allowListAllVolumes;
public OzoneNativeAuthorizer() {
}
@@ -87,14 +88,18 @@ public class OzoneNativeAuthorizer implements
IAccessAuthorizer {
"configured to work with OzoneObjInfo type only.", INVALID_REQUEST);
}
- // by pass all checks for admin
+ // bypass all checks for admin
boolean isAdmin = isAdmin(context.getClientUgi());
if (isAdmin) {
return true;
}
+ boolean isOwner = isOwner(context.getClientUgi(), context.getOwnerName());
boolean isListAllVolume = ((context.getAclRights() == ACLType.LIST) &&
objInfo.getVolumeName().equals(OzoneConsts.OZONE_ROOT));
+ if (isListAllVolume) {
+ return getAllowListAllVolumes();
+ }
// For CREATE and DELETE acl requests, the parents need to be checked
// for WRITE acl. If Key create request is received, then we need to
@@ -114,13 +119,19 @@ public class OzoneNativeAuthorizer implements
IAccessAuthorizer {
switch (objInfo.getResourceType()) {
case VOLUME:
LOG.trace("Checking access for volume: {}", objInfo);
- if (isACLTypeCreate || isListAllVolume) {
+ if (isACLTypeCreate) {
// only admin is allowed to create volume and list all volumes
return false;
}
- return volumeManager.checkAccess(objInfo, context);
+ boolean volumeAccess = isOwner ||
+ volumeManager.checkAccess(objInfo, context);
+ return volumeAccess;
case BUCKET:
LOG.trace("Checking access for bucket: {}", objInfo);
+ // Skip check for volume owner
+ if (isOwner) {
+ return true;
+ }
// Skip bucket access check for CREATE acl since
// bucket will not exist at the time of creation
boolean bucketAccess = isACLTypeCreate
@@ -129,6 +140,10 @@ public class OzoneNativeAuthorizer implements
IAccessAuthorizer {
&& volumeManager.checkAccess(objInfo, parentContext));
case KEY:
LOG.trace("Checking access for Key: {}", objInfo);
+ // Skip check for volume owner
+ if (isOwner) {
+ return true;
+ }
// Skip key access check for CREATE acl since
// key will not exist at the time of creation
boolean keyAccess = isACLTypeCreate
@@ -139,6 +154,10 @@ public class OzoneNativeAuthorizer implements
IAccessAuthorizer {
&& volumeManager.checkAccess(objInfo, parentContext));
case PREFIX:
LOG.trace("Checking access for Prefix: {}", objInfo);
+ // Skip check for volume owner
+ if (isOwner) {
+ return true;
+ }
// Skip prefix access check for CREATE acl since
// prefix will not exist at the time of creation
boolean prefixAccess = isACLTypeCreate
@@ -176,6 +195,25 @@ public class OzoneNativeAuthorizer implements
IAccessAuthorizer {
return Collections.unmodifiableCollection(this.ozAdmins);
}
+ public void setAllowListAllVolumes(boolean allowListAllVolumes) {
+ this.allowListAllVolumes = allowListAllVolumes;
+ }
+
+ public boolean getAllowListAllVolumes() {
+ return allowListAllVolumes;
+ }
+
+ private boolean isOwner(UserGroupInformation callerUgi, String ownerName) {
+ if (ownerName == null) {
+ return false;
+ }
+ if (callerUgi.getUserName().equals(ownerName) ||
+ callerUgi.getShortUserName().equals(ownerName)) {
+ return true;
+ }
+ return false;
+ }
+
private boolean isAdmin(UserGroupInformation callerUgi) {
if (ozAdmins == null) {
return false;
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
new file mode 100644
index 0000000..cb7471d
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/acl/TestVolumeOwner.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.security.acl;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.ozone.om.BucketManagerImpl;
+import org.apache.hadoop.ozone.om.KeyManagerImpl;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.PrefixManager;
+import org.apache.hadoop.ozone.om.PrefixManagerImpl;
+import org.apache.hadoop.ozone.om.VolumeManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE;
+import static
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
+import static
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.CREATE;
+import static
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.NONE;
+import static org.mockito.Mockito.mock;
+
+
+/**
+ * Test Ozone owner check from OzoneNativeAuthorizer.
+ */
+public class TestVolumeOwner {
+
+ private static OzoneConfiguration ozoneConfig;
+ private static OzoneNativeAuthorizer nativeAuthorizer;
+ private static KeyManagerImpl keyManager;
+ private static VolumeManagerImpl volumeManager;
+ private static BucketManagerImpl bucketManager;
+ private static PrefixManager prefixManager;
+ private static OMMetadataManager metadataManager;
+ private static UserGroupInformation testUgi;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ ozoneConfig = new OzoneConfiguration();
+ ozoneConfig.set(OZONE_ACL_AUTHORIZER_CLASS,
+ OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
+ File dir = GenericTestUtils.getRandomizedTestDir();
+ ozoneConfig.set(OZONE_METADATA_DIRS, dir.toString());
+
+ metadataManager = new OmMetadataManagerImpl(ozoneConfig);
+ volumeManager = new VolumeManagerImpl(metadataManager, ozoneConfig);
+ bucketManager = new BucketManagerImpl(metadataManager);
+ keyManager = new KeyManagerImpl(mock(ScmBlockLocationProtocol.class),
+ metadataManager, ozoneConfig, "om1", null);
+ prefixManager = new PrefixManagerImpl(metadataManager, false);
+
+ nativeAuthorizer = new OzoneNativeAuthorizer(volumeManager, bucketManager,
+ keyManager, prefixManager,
+ Collections.singletonList("om"));
+
+ testUgi = UserGroupInformation.createUserForTesting("testuser",
+ new String[]{"test"});
+
+ prepareTestVols();
+ prepareTestBuckets();
+ prepareTestKeys();
+ }
+
+ // create 2 volumes
+ private static void prepareTestVols() throws IOException {
+ for (int i = 0; i < 2; i++) {
+ OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
+ .setVolume(getTestVolumeName(i))
+ .setAdminName("om")
+ .setOwnerName(getTestVolOwnerName(i))
+ .build();
+ TestOMRequestUtils.addVolumeToOM(metadataManager, volumeArgs);
+ }
+ }
+
+ // create 2 buckets under each volume
+ private static void prepareTestBuckets() throws IOException {
+ for (int i = 0; i < 2; i++) {
+ for (int j = 0; j < 2; j++) {
+ OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
+ .setVolumeName(getTestVolumeName(i))
+ .setBucketName(getTestBucketName(j))
+ .build();
+ TestOMRequestUtils.addBucketToOM(metadataManager, bucketInfo);
+ }
+ }
+ }
+
+ // create 2 keys under each test buckets
+ private static void prepareTestKeys() throws IOException {
+ for (int i = 0; i < 2; i++) {
+ for (int j = 0; j < 2; j++) {
+ for (int k = 0; k < 2; k++) {
+ OmKeyArgs.Builder keyArgsBuilder = new OmKeyArgs.Builder()
+ .setVolumeName(getTestVolumeName(i))
+ .setBucketName(getTestBucketName(j))
+ .setKeyName(getTestKeyName(k))
+ .setFactor(HddsProtos.ReplicationFactor.ONE)
+ .setDataSize(0)
+ .setType(HddsProtos.ReplicationType.STAND_ALONE);
+ if (k == 0) {
+ keyArgsBuilder.setAcls(OzoneAclUtil.getAclList(
+ testUgi.getUserName(), testUgi.getGroupNames(), ALL, ALL));
+ } else {
+ keyArgsBuilder.setAcls(OzoneAclUtil.getAclList(
+ testUgi.getUserName(), testUgi.getGroupNames(), NONE, NONE));
+ }
+ OmKeyArgs keyArgs = keyArgsBuilder.build();
+ OpenKeySession keySession = keyManager.createFile(keyArgs, true,
+ false);
+ keyArgs.setLocationInfoList(
+ keySession.getKeyInfo().getLatestVersionLocations()
+ .getLocationList());
+ keyManager.commitKey(keyArgs, keySession.getId());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testVolumeOps() throws Exception {
+ OzoneObj vol0 = getTestVolumeobj(0);
+
+ // admin = true, owner = false, ownerName = testvolumeOwner
+ RequestContext nonOwnerContext = getUserRequestContext("om",
+ IAccessAuthorizer.ACLType.CREATE, false, getTestVolOwnerName(0));
+ Assert.assertTrue("matching admins are allowed to perform admin " +
+ "operations", nativeAuthorizer.checkAccess(vol0, nonOwnerContext));
+
+ // admin = true, owner = false, ownerName = null
+ Assert.assertTrue("matching admins are allowed to perform admin " +
+ "operations", nativeAuthorizer.checkAccess(vol0, nonOwnerContext));
+
+ // admin = false, owner = false, ownerName = testvolumeOwner
+ RequestContext nonAdminNonOwnerContext = getUserRequestContext("testuser",
+ IAccessAuthorizer.ACLType.CREATE, false, getTestVolOwnerName(0));
+ Assert.assertFalse("mismatching admins are not allowed to perform admin " +
+ "operations", nativeAuthorizer.checkAccess(vol0,
+ nonAdminNonOwnerContext));
+
+ // admin = false, owner = true
+ RequestContext nonAdminOwnerContext = getUserRequestContext(
+ getTestVolOwnerName(0), IAccessAuthorizer.ACLType.CREATE,
+ true, getTestVolOwnerName(0));
+ Assert.assertFalse("mismatching admins are not allowed to perform admin " +
+ "operations even for owner", nativeAuthorizer.checkAccess(vol0,
+ nonAdminOwnerContext));
+
+ List<IAccessAuthorizer.ACLType> aclsToTest =
+ Arrays.stream(IAccessAuthorizer.ACLType.values()).filter(
+ (type)-> type != NONE && type != CREATE)
+ .collect(Collectors.toList());
+ for (IAccessAuthorizer.ACLType type: aclsToTest) {
+ nonAdminOwnerContext = getUserRequestContext(getTestVolOwnerName(0),
+ type, true, getTestVolOwnerName(0));
+ Assert.assertTrue("Owner is allowed to perform all non-admin " +
+ "operations", nativeAuthorizer.checkAccess(vol0,
+ nonAdminOwnerContext));
+ }
+ }
+
+ @Test
+ public void testBucketOps() throws Exception {
+ OzoneObj obj = getTestBucketobj(1, 1);
+ List<IAccessAuthorizer.ACLType> aclsToTest = getAclsToTest();
+
+ // admin = false, owner = true
+ for (IAccessAuthorizer.ACLType type: aclsToTest) {
+ RequestContext nonAdminOwnerContext = getUserRequestContext(
+ getTestVolOwnerName(1), type, true, getTestVolOwnerName(1));
+ Assert.assertTrue("non admin volume owner without acls are allowed" +
+ " to do " + type + " on bucket",
+ nativeAuthorizer.checkAccess(obj, nonAdminOwnerContext));
+ }
+
+ // admin = false, owner = false
+ for (IAccessAuthorizer.ACLType type: aclsToTest) {
+ RequestContext nonAdminOwnerContext = getUserRequestContext(
+ getTestVolOwnerName(1), type, false, getTestVolOwnerName(0));
+ Assert.assertFalse("non admin non volume owner without acls" +
+ " are not allowed to do " + type + " on bucket",
+ nativeAuthorizer.checkAccess(obj, nonAdminOwnerContext));
+ }
+ }
+
+ @Test
+ public void testKeyOps() throws Exception {
+ OzoneObj obj = getTestKeyobj(0, 0, 1);
+ List<IAccessAuthorizer.ACLType> aclsToTest = getAclsToTest();
+
+ // admin = false, owner = true
+ for (IAccessAuthorizer.ACLType type: aclsToTest) {
+ RequestContext nonAdminOwnerContext = getUserRequestContext(
+ getTestVolOwnerName(0), type, true, getTestVolOwnerName(0));
+ Assert.assertTrue("non admin volume owner without acls are allowed to " +
+ "access key",
+ nativeAuthorizer.checkAccess(obj, nonAdminOwnerContext));
+ }
+
+ // admin = false, owner = false
+ for (IAccessAuthorizer.ACLType type: aclsToTest) {
+ RequestContext nonAdminOwnerContext = getUserRequestContext(
+ getTestVolOwnerName(0), type, false, getTestVolOwnerName(1));
+ Assert.assertFalse("non admin volume owner without acls are" +
+ " not allowed to access key",
+ nativeAuthorizer.checkAccess(obj, nonAdminOwnerContext));
+ }
+ }
+
+ private RequestContext getUserRequestContext(String username,
+ IAccessAuthorizer.ACLType type, boolean isOwner, String ownerName) {
+ return RequestContext.getBuilder(
+ UserGroupInformation.createRemoteUser(username), null, null,
+ type, ownerName).build();
+ }
+
+ private static String getTestVolumeName(int index) {
+ return "vol" + index;
+ }
+
+ private static String getTestVolOwnerName(int index) {
+ return "owner" + index;
+ }
+
+ private static String getTestBucketName(int index) {
+ return "bucket" + index;
+ }
+
+ private static String getTestKeyName(int index) {
+ return "key" + index;
+ }
+
+ private OzoneObj getTestVolumeobj(int index) {
+ return OzoneObjInfo.Builder.getBuilder(OzoneObj.ResourceType.VOLUME,
+ OzoneObj.StoreType.OZONE,
+ getTestVolumeName(index), null, null).build();
+ }
+
+ private OzoneObj getTestBucketobj(int volIndex, int bucketIndex) {
+ return OzoneObjInfo.Builder.newBuilder()
+ .setResType(OzoneObj.ResourceType.BUCKET)
+ .setStoreType(OzoneObj.StoreType.OZONE)
+ .setVolumeName(getTestVolumeName(volIndex))
+ .setBucketName(getTestBucketName(bucketIndex)).build();
+ }
+
+ private OzoneObj getTestKeyobj(int volIndex, int bucketIndex,
+ int keyIndex) {
+ return OzoneObjInfo.Builder.newBuilder()
+ .setResType(OzoneObj.ResourceType.KEY)
+ .setStoreType(OzoneObj.StoreType.OZONE)
+ .setVolumeName(getTestVolumeName(volIndex))
+ .setBucketName(getTestBucketName(bucketIndex))
+ .setKeyName(getTestKeyName(keyIndex))
+ .build();
+ }
+
+ List<IAccessAuthorizer.ACLType> getAclsToTest() {
+ return Arrays.stream(IAccessAuthorizer.ACLType.values()).filter(
+ (type)-> type != NONE).collect(Collectors.toList());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]