This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new f4a1e07 ATLAS-3836 Add Apache Ozone support in hive hook f4a1e07 is described below commit f4a1e078006985c21883f7cef00e7cf94750d96e Author: Nikhil Bonte <nikhil.bo...@freestoneinfotech.com> AuthorDate: Fri Jun 26 12:20:09 2020 +0530 ATLAS-3836 Add Apache Ozone support in hive hook Signed-off-by: Sarath Subramanian <sar...@apache.org> (cherry picked from commit 8b50ac0c7d8eba8efa0deb97f1fee68d382f11ad) --- .../atlas/hive/hook/AtlasHiveHookContext.java | 5 +- .../java/org/apache/atlas/hive/hook/HiveHook.java | 8 +- .../atlas/hive/hook/events/BaseHiveEvent.java | 309 +------------ .../apache/atlas/utils/AtlasPathExtractorUtil.java | 493 +++++++++++++++++++++ .../apache/atlas/utils/PathExtractorContext.java | 74 ++++ .../atlas/utils/AtlasPathExtractorUtilTest.java | 176 ++++++++ 6 files changed, 763 insertions(+), 302 deletions(-) diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java index d0b9393..1286471 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java @@ -157,6 +157,7 @@ public class AtlasHiveHookContext { public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); } + public Map<String, AtlasEntity> getQNameToEntityMap() { return qNameEntityMap; } public String getMetadataNamespace() { return hook.getMetadataNamespace(); @@ -168,8 +169,8 @@ public class AtlasHiveHookContext { return hook.isConvertHdfsPathToLowerCase(); } - public boolean isAwsS3AtlasModelVersionV2() { - return hook.isAwsS3AtlasModelVersionV2(); + public String getAwsS3AtlasModelVersion() { + return hook.getAwsS3AtlasModelVersion(); } public boolean getSkipHiveColumnLineageHive20633() { diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 3aa5c3b..6513234 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -77,7 +77,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private static final int nameCacheDatabaseMaxCount; private static final int nameCacheTableMaxCount; private static final int nameCacheRebuildIntervalSeconds; - private static final boolean isAwsS3AtlasModelVersionV2; + private static final String awsS3AtlasModelVersion; private static final boolean skipHiveColumnLineageHive20633; private static final int skipHiveColumnLineageHive20633InputsThreshold; @@ -101,7 +101,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { nameCacheDatabaseMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000); nameCacheTableMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000); nameCacheRebuildIntervalSeconds = atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 minutes default - isAwsS3AtlasModelVersionV2 = StringUtils.equalsIgnoreCase(atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2), HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2); + awsS3AtlasModelVersion = atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2); skipHiveColumnLineageHive20633 = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false); skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15 hiveProcessPopulateDeprecatedAttributes = atlasProperties.getBoolean(HOOK_HIVE_PROCESS_POPULATE_DEPRECATED_ATTRIBUTES, false); @@ -257,7 +257,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return convertHdfsPathToLowerCase; } - public boolean isAwsS3AtlasModelVersionV2() { return isAwsS3AtlasModelVersionV2; } + public String getAwsS3AtlasModelVersion() { + return awsS3AtlasModelVersion; + } public boolean getSkipHiveColumnLineageHive20633() { return skipHiveColumnLineageHive20633; diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java index c57e2ea..a532c48 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java @@ -21,6 +21,7 @@ package org.apache.atlas.hive.hook.events; import com.google.common.collect.ImmutableMap; import org.apache.atlas.hive.hook.AtlasHiveHookContext; import org.apache.atlas.hive.hook.HiveHook.PreprocessAction; +import org.apache.atlas.utils.PathExtractorContext; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; @@ -31,6 +32,7 @@ import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.repository.Constants; import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.utils.AtlasPathExtractorUtil; import org.apache.atlas.utils.HdfsNameServiceResolver; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; @@ -76,25 +78,8 @@ public abstract class BaseHiveEvent { public static final String HIVE_TYPE_PROCESS_EXECUTION = "hive_process_execution"; public static final String HIVE_DB_DDL = "hive_db_ddl"; public static final String HIVE_TABLE_DDL = "hive_table_ddl"; - public static final String HDFS_TYPE_PATH = "hdfs_path"; public static final String HBASE_TYPE_TABLE = "hbase_table"; public static final String HBASE_TYPE_NAMESPACE = "hbase_namespace"; - public static final String AWS_S3_BUCKET = "aws_s3_bucket"; - public static final String AWS_S3_PSEUDO_DIR = "aws_s3_pseudo_dir"; - public static final String AWS_S3_OBJECT = "aws_s3_object"; - public static final String AWS_S3_V2_BUCKET = "aws_s3_v2_bucket"; - public static final String AWS_S3_V2_PSEUDO_DIR = "aws_s3_v2_directory"; - public static final String ADLS_GEN2_ACCOUNT = "adls_gen2_account"; - public static final String ADLS_GEN2_CONTAINER = "adls_gen2_container"; - public static final String ADLS_GEN2_DIRECTORY = "adls_gen2_directory"; - public static final String ADLS_GEN2_ACCOUNT_HOST_SUFFIX = ".dfs.core.windows.net"; - - public static final String SCHEME_SEPARATOR = "://"; - public static final String S3_SCHEME = "s3" + SCHEME_SEPARATOR; - public static final String S3A_SCHEME = "s3a" + SCHEME_SEPARATOR; - public static final String ABFS_SCHEME = "abfs" + SCHEME_SEPARATOR; - public static final String ABFSS_SCHEME = "abfss" + SCHEME_SEPARATOR; - public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; public static final String ATTRIBUTE_NAME = "name"; public static final String ATTRIBUTE_DESCRIPTION = "description"; @@ -148,16 +133,10 @@ public abstract class BaseHiveEvent { public static final String ATTRIBUTE_URI = "uri"; public static final String ATTRIBUTE_STORAGE_HANDLER = "storage_handler"; public static final String ATTRIBUTE_NAMESPACE = "namespace"; - public static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix"; - public static final String ATTRIBUTE_BUCKET = "bucket"; - public static final String ATTRIBUTE_CONTAINER = "container"; public static final String ATTRIBUTE_HOSTNAME = "hostName"; public static final String ATTRIBUTE_EXEC_TIME = "execTime"; public static final String ATTRIBUTE_DDL_QUERIES = "ddlQueries"; public static final String ATTRIBUTE_SERVICE_TYPE = "serviceType"; - public static final String ATTRIBUTE_ACCOUNT = "account"; - public static final String ATTRIBUTE_PARENT = "parent"; - public static final String HBASE_STORAGE_HANDLER_CLASS = "org.apache.hadoop.hive.hbase.HBaseStorageHandler"; public static final String HBASE_DEFAULT_NAMESPACE = "default"; public static final String HBASE_NAMESPACE_TABLE_DELIMITER = ":"; @@ -173,14 +152,10 @@ public abstract class BaseHiveEvent { public static final String RELATIONSHIP_HIVE_TABLE_PART_KEYS = "hive_table_partitionkeys"; public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS = "hive_table_columns"; public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC = "hive_table_storagedesc"; - public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS = "aws_s3_bucket_aws_s3_pseudo_dirs"; - public static final String RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED = "aws_s3_v2_container_contained"; public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE = "hive_process_process_executions"; public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES = "hive_db_ddl_queries"; public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES = "hive_table_ddl_queries"; public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE = "hbase_table_namespace"; - public static final String RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS = "adls_gen2_account_containers"; - public static final String RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN = "adls_gen2_parent_children"; public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new HashMap<>(); @@ -587,52 +562,21 @@ public abstract class BaseHiveEvent { } protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo) { - AtlasEntity ret; - String strPath = path.toString(); - String metadataNamespace = getMetadataNamespace(); - - if (strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase()) { - strPath = strPath.toLowerCase(); - } - - if (isS3Path(strPath)) { - if (context.isAwsS3AtlasModelVersionV2()) { - ret = addS3PathEntityV2(path, strPath, extInfo); - } else { - ret = addS3PathEntityV1(path, strPath, extInfo); - } - } else if (isAbfsPath(strPath)) { - ret = addAbfsPathEntity(path, strPath, extInfo); - } else { - String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath); - String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath); - String pathQualifiedName = getQualifiedName(attrPath); - - ret = context.getEntity(pathQualifiedName); - - if (ret == null) { - ret = new AtlasEntity(HDFS_TYPE_PATH); - - if (StringUtils.isNotEmpty(nameServiceID)) { - ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID); - } - - String name = Path.getPathWithoutSchemeAndAuthority(path).toString(); + String strPath = path.toString(); + String metadataNamespace = getMetadataNamespace(); + boolean isConvertPathToLowerCase = strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase(); + PathExtractorContext pathExtractorContext = new PathExtractorContext(metadataNamespace, context.getQNameToEntityMap(), + isConvertPathToLowerCase, context.getAwsS3AtlasModelVersion()); - if (strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase()) { - name = name.toLowerCase(); - } - - ret.setAttribute(ATTRIBUTE_PATH, attrPath); - ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); - ret.setAttribute(ATTRIBUTE_NAME, name); - ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace); + AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, pathExtractorContext); - context.putEntity(pathQualifiedName, ret); + if (entityWithExtInfo.getReferredEntities() != null){ + for (AtlasEntity entity : entityWithExtInfo.getReferredEntities().values()) { + extInfo.addReferredEntity(entity); } } - return ret; + return entityWithExtInfo.getEntity(); } protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception { @@ -1148,235 +1092,6 @@ public abstract class BaseHiveEvent { return false; } - private boolean isS3Path(String strPath) { - return strPath != null && (strPath.startsWith(S3_SCHEME) || strPath.startsWith(S3A_SCHEME)); - } - - private boolean isAbfsPath(String strPath) { - return strPath != null && (strPath.startsWith(ABFS_SCHEME) || strPath.startsWith(ABFSS_SCHEME)); - } - - private AtlasEntity addS3PathEntityV1(Path path, String strPath, AtlasEntityExtInfo extInfo) { - String metadataNamespace = getMetadataNamespace(); - String bucketName = path.toUri().getAuthority(); - String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; - String pathQualifiedName = (strPath + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; - AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName); - AtlasEntity ret = context.getEntity(pathQualifiedName); - - if (ret == null) { - if (bucketEntity == null) { - bucketEntity = new AtlasEntity(AWS_S3_BUCKET); - - bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName); - bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName); - - context.putEntity(bucketQualifiedName, bucketEntity); - } - - extInfo.addReferredEntity(bucketEntity); - - ret = new AtlasEntity(AWS_S3_PSEUDO_DIR); - - ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS)); - ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); - ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); - ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); - - context.putEntity(pathQualifiedName, ret); - } - - return ret; - } - - private AtlasEntity addS3PathEntityV2(Path path, String strPath, AtlasEntityExtInfo extInfo) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> addS3PathEntityV2(strPath={})", strPath); - } - - String metadataNamespace = getMetadataNamespace(); - String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; - AtlasEntity ret = context.getEntity(pathQualifiedName); - - if (ret == null) { - String bucketName = path.toUri().getAuthority(); - String schemeAndBucketName = (path.toUri().getScheme() + SCHEME_SEPARATOR + bucketName).toLowerCase(); - String bucketQualifiedName = schemeAndBucketName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; - AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName); - - if (bucketEntity == null) { - bucketEntity = new AtlasEntity(AWS_S3_V2_BUCKET); - - bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName); - bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName); - - if (LOG.isDebugEnabled()) { - LOG.debug("adding entity: typeName={}, qualifiedName={}", bucketEntity.getTypeName(), bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); - } - - context.putEntity(bucketQualifiedName, bucketEntity); - } - - extInfo.addReferredEntity(bucketEntity); - - AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED); - String parentPath = Path.SEPARATOR; - String dirPath = path.toUri().getPath(); - - if (StringUtils.isEmpty(dirPath)) { - dirPath = Path.SEPARATOR; - } - - for (String subDirName : dirPath.split(Path.SEPARATOR)) { - if (StringUtils.isEmpty(subDirName)) { - continue; - } - - String subDirPath = parentPath + subDirName + Path.SEPARATOR; - String subDirQualifiedName = schemeAndBucketName + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; - - ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR); - - ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId); - ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath); - ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName); - ret.setAttribute(ATTRIBUTE_NAME, subDirName); - - if (LOG.isDebugEnabled()) { - LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); - } - - context.putEntity(subDirQualifiedName, ret); - - parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED); - parentPath = subDirPath; - } - - if (ret == null) { - ret = bucketEntity; - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== addS3PathEntityV2(strPath={})", strPath); - } - - return ret; - } - - private AtlasEntity addAbfsPathEntity(Path path, String strPath, AtlasEntityExtInfo extInfo) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> addAbfsPathEntity(strPath={})", strPath); - } - - String metadataNamespace = getMetadataNamespace(); - String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; - AtlasEntity ret = context.getEntity(pathQualifiedName); - - if (ret == null) { - String abfsScheme = path.toUri().getScheme(); - String storageAcctName = getAbfsStorageAccountName(path.toUri()); - String schemeAndStorageAcctName = (abfsScheme + SCHEME_SEPARATOR + storageAcctName).toLowerCase(); - String storageAcctQualifiedName = schemeAndStorageAcctName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; - AtlasEntity storageAcctEntity = context.getEntity(storageAcctQualifiedName); - - // create adls-gen2 storage-account entity - if (storageAcctEntity == null) { - storageAcctEntity = new AtlasEntity(ADLS_GEN2_ACCOUNT); - - storageAcctEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, storageAcctQualifiedName); - storageAcctEntity.setAttribute(ATTRIBUTE_NAME, storageAcctName); - - if (LOG.isDebugEnabled()) { - LOG.debug("adding entity: typeName={}, qualifiedName={}", storageAcctEntity.getTypeName(), storageAcctEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); - } - - context.putEntity(storageAcctQualifiedName, storageAcctEntity); - } - - extInfo.addReferredEntity(storageAcctEntity); - - AtlasRelatedObjectId storageAcctObjId = AtlasTypeUtil.getAtlasRelatedObjectId(storageAcctEntity, RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS); - - // create adls-gen2 container entity linking to storage account - String containerName = path.toUri().getUserInfo(); - String schemeAndContainerName = (abfsScheme + SCHEME_SEPARATOR + containerName + QNAME_SEP_METADATA_NAMESPACE + storageAcctName).toLowerCase(); - String containerQualifiedName = schemeAndContainerName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; - AtlasEntity containerEntity = context.getEntity(containerQualifiedName); - - if (containerEntity == null) { - containerEntity = new AtlasEntity(ADLS_GEN2_CONTAINER); - - containerEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, containerQualifiedName); - containerEntity.setAttribute(ATTRIBUTE_NAME, containerName); - containerEntity.setRelationshipAttribute(ATTRIBUTE_ACCOUNT, storageAcctObjId); - - if (LOG.isDebugEnabled()) { - LOG.debug("adding entity: typeName={}, qualifiedName={}", containerEntity.getTypeName(), containerEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); - } - - context.putEntity(containerQualifiedName, containerEntity); - } - - extInfo.addReferredEntity(containerEntity); - - // create adls-gen2 directory entity linking to container - AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(containerEntity, RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN); - String parentPath = Path.SEPARATOR; - String dirPath = path.toUri().getPath(); - - if (StringUtils.isEmpty(dirPath)) { - dirPath = Path.SEPARATOR; - } - - for (String subDirName : dirPath.split(Path.SEPARATOR)) { - if (StringUtils.isEmpty(subDirName)) { - continue; - } - - String subDirPath = parentPath + subDirName + Path.SEPARATOR; - String subDirQualifiedName = schemeAndContainerName + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; - - ret = new AtlasEntity(ADLS_GEN2_DIRECTORY); - - ret.setRelationshipAttribute(ATTRIBUTE_PARENT, parentObjId); - ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName); - ret.setAttribute(ATTRIBUTE_NAME, subDirName); - - if (LOG.isDebugEnabled()) { - LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); - } - - context.putEntity(subDirQualifiedName, ret); - - parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN); - parentPath = subDirPath; - } - - if (ret == null) { - ret = storageAcctEntity; - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== addAbfsPathEntity(strPath={})", strPath); - } - - return ret; - } - - private String getAbfsStorageAccountName(URI uri) { - String ret = null; - String host = uri.getHost(); - - // host: "<account_name>.dfs.core.windows.net" - if (StringUtils.isNotEmpty(host) && host.contains(ADLS_GEN2_ACCOUNT_HOST_SUFFIX)) { - ret = host.substring(0, host.indexOf(ADLS_GEN2_ACCOUNT_HOST_SUFFIX)); - } - - return ret; - } - static final class EntityComparator implements Comparator<Entity> { @Override public int compare(Entity entity1, Entity entity2) { diff --git a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java new file mode 100644 index 0000000..c3276a8 --- /dev/null +++ b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java @@ -0,0 +1,493 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.utils; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; + +public class AtlasPathExtractorUtil { + private static final Logger LOG = LoggerFactory.getLogger(AtlasPathExtractorUtil.class); + + // Common + public static final char QNAME_SEP_METADATA_NAMESPACE = '@'; + public static final char QNAME_SEP_ENTITY_NAME = '.'; + public static final String SCHEME_SEPARATOR = "://"; + public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + public static final String ATTRIBUTE_NAME = "name"; + public static final String ATTRIBUTE_BUCKET = "bucket"; + + // HDFS + public static final String HDFS_TYPE_PATH = "hdfs_path"; + public static final String ATTRIBUTE_PATH = "path"; + public static final String ATTRIBUTE_CLUSTER_NAME = "clusterName"; + public static final String ATTRIBUTE_NAMESERVICE_ID = "nameServiceId"; + + // AWS S3 + public static final String AWS_S3_ATLAS_MODEL_VERSION_V2 = "v2"; + public static final String AWS_S3_BUCKET = "aws_s3_bucket"; + public static final String AWS_S3_PSEUDO_DIR = "aws_s3_pseudo_dir"; + public static final String AWS_S3_V2_BUCKET = "aws_s3_v2_bucket"; + public static final String AWS_S3_V2_PSEUDO_DIR = "aws_s3_v2_directory"; + public static final String S3_SCHEME = "s3" + SCHEME_SEPARATOR; + public static final String S3A_SCHEME = "s3a" + SCHEME_SEPARATOR; + public static final String ATTRIBUTE_CONTAINER = "container"; + public static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix"; + public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS = "aws_s3_bucket_aws_s3_pseudo_dirs"; + public static final String RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED = "aws_s3_v2_container_contained"; + + // ADLS Gen2 + public static final String ADLS_GEN2_ACCOUNT = "adls_gen2_account"; + public static final String ADLS_GEN2_CONTAINER = "adls_gen2_container"; + public static final String ADLS_GEN2_DIRECTORY = "adls_gen2_directory"; + public static final String ADLS_GEN2_ACCOUNT_HOST_SUFFIX = ".dfs.core.windows.net"; + public static final String ABFS_SCHEME = "abfs" + SCHEME_SEPARATOR; + public static final String ABFSS_SCHEME = "abfss" + SCHEME_SEPARATOR; + public static final String ATTRIBUTE_ACCOUNT = "account"; + public static final String ATTRIBUTE_PARENT = "parent"; + public static final String RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS = "adls_gen2_account_containers"; + public static final String RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN = "adls_gen2_parent_children"; + + // Ozone + public static final String OZONE_VOLUME = "ozone_volume"; + public static final String OZONE_BUCKET = "ozone_bucket"; + public static final String OZONE_KEY = "ozone_key"; + public static final String OZONE_SCHEME = "ofs" + SCHEME_SEPARATOR; + public static final String OZONE_3_SCHEME = "o3fs" + SCHEME_SEPARATOR; + public static final String ATTRIBUTE_VOLUME = "volume"; + public static final String RELATIONSHIP_OZONE_VOLUME_BUCKET = "ozone_volume_buckets"; + public static final String RELATIONSHIP_OZONE_BUCKET_KEY = "ozone_bucket_keys"; + + public static AtlasEntityWithExtInfo getPathEntity(Path path, PathExtractorContext context) { + AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntityWithExtInfo(); + AtlasEntity ret; + String strPath = path.toString(); + + if (context.isConvertPathToLowerCase()) { + strPath = strPath.toLowerCase(); + } + + if (isS3Path(strPath)) { + ret = isAwsS3AtlasModelVersionV2(context) ? addS3PathEntityV2(path, entityWithExtInfo, context) : + addS3PathEntityV1(path, entityWithExtInfo, context); + } else if (isAbfsPath(strPath)) { + ret = addAbfsPathEntity(path, entityWithExtInfo, context); + } else if (isOzonePath(strPath)) { + ret = addOzonePathEntity(path, entityWithExtInfo, context); + } else { + ret = addHDFSPathEntity(path, context); + } + entityWithExtInfo.setEntity(ret); + + return entityWithExtInfo; + } + + private static boolean isAwsS3AtlasModelVersionV2(PathExtractorContext context) { + return StringUtils.isNotEmpty(context.getAwsS3AtlasModelVersion()) && + StringUtils.equalsIgnoreCase(context.getAwsS3AtlasModelVersion(), AWS_S3_ATLAS_MODEL_VERSION_V2); + } + + private static boolean isS3Path(String strPath) { + return strPath != null && (strPath.startsWith(S3_SCHEME) || strPath.startsWith(S3A_SCHEME)); + } + + private static boolean isAbfsPath(String strPath) { + return strPath != null && (strPath.startsWith(ABFS_SCHEME) || strPath.startsWith(ABFSS_SCHEME)); + } + + private static boolean isOzonePath(String strPath) { + return strPath != null && (strPath.startsWith(OZONE_SCHEME) || strPath.startsWith(OZONE_3_SCHEME)); + } + + private static AtlasEntity addS3PathEntityV1(Path path, AtlasEntityExtInfo extInfo, PathExtractorContext context) { + String strPath = path.toString(); + + if (LOG.isDebugEnabled()) { + LOG.debug("==> addS3PathEntityV1(strPath={})", strPath); + } + + String metadataNamespace = context.getMetadataNamespace(); + String bucketName = path.toUri().getAuthority(); + String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; + String pathQualifiedName = (strPath + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace; + AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName); + AtlasEntity ret = context.getEntity(pathQualifiedName); + + if (ret == null) { + if (bucketEntity == null) { + bucketEntity = new AtlasEntity(AWS_S3_BUCKET); + + bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName); + bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName); + + context.putEntity(bucketQualifiedName, bucketEntity); + } + + extInfo.addReferredEntity(bucketEntity); + + ret = new AtlasEntity(AWS_S3_PSEUDO_DIR); + + ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS)); + ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); + ret.setAttribute(ATTRIBUTE_NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); + + context.putEntity(pathQualifiedName, ret); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== addS3PathEntityV1(strPath={})", strPath); + } + + return ret; + } + + private static AtlasEntity addS3PathEntityV2(Path path, AtlasEntityExtInfo extInfo, PathExtractorContext context) { + String strPath = path.toString(); + + if (LOG.isDebugEnabled()) { + LOG.debug("==> addS3PathEntityV2(strPath={})", strPath); + } + + String metadataNamespace = context.getMetadataNamespace(); + String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + AtlasEntity ret = context.getEntity(pathQualifiedName); + + if (ret == null) { + String bucketName = path.toUri().getAuthority(); + String schemeAndBucketName = (path.toUri().getScheme() + SCHEME_SEPARATOR + bucketName).toLowerCase(); + String bucketQualifiedName = schemeAndBucketName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName); + + if (bucketEntity == null) { + bucketEntity = new AtlasEntity(AWS_S3_V2_BUCKET); + + bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName); + bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName); + + if (LOG.isDebugEnabled()) { + LOG.debug("adding entity: typeName={}, qualifiedName={}", bucketEntity.getTypeName(), bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } + + context.putEntity(bucketQualifiedName, bucketEntity); + } + + extInfo.addReferredEntity(bucketEntity); + + AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED); + String parentPath = Path.SEPARATOR; + String dirPath = path.toUri().getPath(); + + if (StringUtils.isEmpty(dirPath)) { + dirPath = Path.SEPARATOR; + } + + for (String subDirName : dirPath.split(Path.SEPARATOR)) { + if (StringUtils.isEmpty(subDirName)) { + continue; + } + + String subDirPath = parentPath + subDirName + Path.SEPARATOR; + String subDirQualifiedName = schemeAndBucketName + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + + ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR); + + ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId); + ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName); + ret.setAttribute(ATTRIBUTE_NAME, subDirName); + + if (LOG.isDebugEnabled()) { + LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } + + context.putEntity(subDirQualifiedName, ret); + + parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED); + parentPath = subDirPath; + } + + if (ret == null) { + ret = bucketEntity; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== addS3PathEntityV2(strPath={})", strPath); + } + + return ret; + } + + private static AtlasEntity addAbfsPathEntity(Path path, AtlasEntityExtInfo extInfo, PathExtractorContext context) { + String strPath = path.toString(); + + if (LOG.isDebugEnabled()) { + LOG.debug("==> addAbfsPathEntity(strPath={})", strPath); + } + + String metadataNamespace = context.getMetadataNamespace(); + String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + AtlasEntity ret = context.getEntity(pathQualifiedName); + + if (ret == null) { + String abfsScheme = path.toUri().getScheme(); + String storageAcctName = getAbfsStorageAccountName(path.toUri()); + String schemeAndStorageAcctName = (abfsScheme + SCHEME_SEPARATOR + storageAcctName).toLowerCase(); + String storageAcctQualifiedName = schemeAndStorageAcctName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + AtlasEntity storageAcctEntity = context.getEntity(storageAcctQualifiedName); + + // create adls-gen2 storage-account entity + if (storageAcctEntity == null) { + storageAcctEntity = new AtlasEntity(ADLS_GEN2_ACCOUNT); + + storageAcctEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, storageAcctQualifiedName); + storageAcctEntity.setAttribute(ATTRIBUTE_NAME, storageAcctName); + + if (LOG.isDebugEnabled()) { + LOG.debug("adding entity: typeName={}, qualifiedName={}", storageAcctEntity.getTypeName(), storageAcctEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } + + context.putEntity(storageAcctQualifiedName, storageAcctEntity); + } + + extInfo.addReferredEntity(storageAcctEntity); + + AtlasRelatedObjectId storageAcctObjId = AtlasTypeUtil.getAtlasRelatedObjectId(storageAcctEntity, RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS); + + // create adls-gen2 container entity linking to storage account + String containerName = path.toUri().getUserInfo(); + String schemeAndContainerName = (abfsScheme + SCHEME_SEPARATOR + containerName + QNAME_SEP_METADATA_NAMESPACE + storageAcctName).toLowerCase(); + String containerQualifiedName = schemeAndContainerName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + AtlasEntity containerEntity = context.getEntity(containerQualifiedName); + + if (containerEntity == null) { + containerEntity = new AtlasEntity(ADLS_GEN2_CONTAINER); + + containerEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, containerQualifiedName); + containerEntity.setAttribute(ATTRIBUTE_NAME, containerName); + containerEntity.setRelationshipAttribute(ATTRIBUTE_ACCOUNT, storageAcctObjId); + + if (LOG.isDebugEnabled()) { + LOG.debug("adding entity: typeName={}, qualifiedName={}", containerEntity.getTypeName(), containerEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } + + context.putEntity(containerQualifiedName, containerEntity); + } + + extInfo.addReferredEntity(containerEntity); + + // create adls-gen2 directory entity linking to container + AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(containerEntity, RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN); + String parentPath = Path.SEPARATOR; + String dirPath = path.toUri().getPath(); + + if (StringUtils.isEmpty(dirPath)) { + dirPath = Path.SEPARATOR; + } + + for (String subDirName : dirPath.split(Path.SEPARATOR)) { + if (StringUtils.isEmpty(subDirName)) { + continue; + } + + String subDirPath = parentPath + subDirName + Path.SEPARATOR; + String subDirQualifiedName = schemeAndContainerName + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + + ret = new AtlasEntity(ADLS_GEN2_DIRECTORY); + + ret.setRelationshipAttribute(ATTRIBUTE_PARENT, parentObjId); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName); + ret.setAttribute(ATTRIBUTE_NAME, subDirName); + + if (LOG.isDebugEnabled()) { + LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } + + context.putEntity(subDirQualifiedName, ret); + + parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN); + parentPath = subDirPath; + } + + if (ret == null) { + ret = storageAcctEntity; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== addAbfsPathEntity(strPath={})", strPath); + } + + return ret; + } + + private static AtlasEntity addOzonePathEntity(Path path, AtlasEntityExtInfo extInfo, PathExtractorContext context) { + String strPath = path.toString(); + + if (LOG.isDebugEnabled()) { + LOG.debug("==> addOzonePathEntity(strPath={})", strPath); + } + + String metadataNamespace = context.getMetadataNamespace(); + String ozoneScheme = path.toUri().getScheme(); + String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + AtlasEntity ret = context.getEntity(pathQualifiedName); + + if (ret == null) { + //create ozone volume entity + String volumeName = getOzoneVolumeName(path); + String volumeQualifiedName = ozoneScheme + SCHEME_SEPARATOR + volumeName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + AtlasEntity volumeEntity = context.getEntity(volumeQualifiedName); + + if (volumeEntity == null) { + volumeEntity = new AtlasEntity(OZONE_VOLUME); + + volumeEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, volumeQualifiedName); + volumeEntity.setAttribute(ATTRIBUTE_NAME, volumeName); + + if (LOG.isDebugEnabled()) { + LOG.debug("adding entity: typeName={}, qualifiedName={}", volumeEntity.getTypeName(), volumeEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } + + context.putEntity(volumeQualifiedName, volumeEntity); + } + + extInfo.addReferredEntity(volumeEntity); + + //create ozone bucket entity + String bucketName = getOzoneBucketName(path); + String bucketQualifiedName = ozoneScheme + SCHEME_SEPARATOR + volumeName + QNAME_SEP_ENTITY_NAME + bucketName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName); + + if (bucketEntity == null) { + bucketEntity = new AtlasEntity(OZONE_BUCKET); + + bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName); + bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName); + bucketEntity.setRelationshipAttribute( ATTRIBUTE_VOLUME, AtlasTypeUtil.getAtlasRelatedObjectId(volumeEntity, RELATIONSHIP_OZONE_VOLUME_BUCKET)); + + if (LOG.isDebugEnabled()) { + LOG.debug("adding entity: typeName={}, qualifiedName={}", bucketEntity.getTypeName(), bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } + + context.putEntity(bucketQualifiedName, bucketEntity); + } + + extInfo.addReferredEntity(bucketEntity); + + ret = new AtlasEntity(OZONE_KEY); + + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); + ret.setAttribute(ATTRIBUTE_NAME, path.toUri().getPath()); + ret.setRelationshipAttribute( ATTRIBUTE_BUCKET, AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_OZONE_BUCKET_KEY)); + + context.putEntity(pathQualifiedName, ret); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== addOzonePathEntity(strPath={})", strPath); + } + + return ret; + } + + private static AtlasEntity addHDFSPathEntity(Path path, PathExtractorContext context) { + String strPath = path.toString(); + + if (context.isConvertPathToLowerCase()) { + strPath = strPath.toLowerCase(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("==> addHDFSPathEntity(strPath={})", strPath); + } + + String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(strPath); + String attrPath = StringUtils.isEmpty(nameServiceID) ? strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath); + String pathQualifiedName = getQualifiedName(attrPath, context.getMetadataNamespace()); + AtlasEntity ret = context.getEntity(pathQualifiedName); + + if (ret == null) { + ret = new AtlasEntity(HDFS_TYPE_PATH); + + if (StringUtils.isNotEmpty(nameServiceID)) { + ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID); + } + + String name = Path.getPathWithoutSchemeAndAuthority(path).toString(); + + if (context.isConvertPathToLowerCase()) { + name = name.toLowerCase(); + } + + ret.setAttribute(ATTRIBUTE_PATH, attrPath); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); + ret.setAttribute(ATTRIBUTE_NAME, name); + ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, context.getMetadataNamespace()); + + context.putEntity(pathQualifiedName, ret); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== addHDFSPathEntity(strPath={})", strPath); + } + + return ret; + } + + private static String getAbfsStorageAccountName(URI uri) { + String ret = null; + String host = uri.getHost(); + + // host: "<account_name>.dfs.core.windows.net" + if (StringUtils.isNotEmpty(host) && host.contains(ADLS_GEN2_ACCOUNT_HOST_SUFFIX)) { + ret = host.substring(0, host.indexOf(ADLS_GEN2_ACCOUNT_HOST_SUFFIX)); + } + + return ret; + } + + private static String getOzoneVolumeName(Path path) { + String pathAuthority = path.toUri().getAuthority(); + // pathAuthority: "<bucket_name>.<volume_name>.<ozone.service.id>" + return pathAuthority.split("\\.")[1]; + } + + private static String getOzoneBucketName(Path path) { + String pathAuthority = path.toUri().getAuthority(); + return pathAuthority.split("\\.")[0]; + } + + private static String getQualifiedName(String path, String metadataNamespace) { + if (path.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) { + return path + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + } + + return path.toLowerCase(); + } +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/atlas/utils/PathExtractorContext.java b/common/src/main/java/org/apache/atlas/utils/PathExtractorContext.java new file mode 100644 index 0000000..ce688e4 --- /dev/null +++ b/common/src/main/java/org/apache/atlas/utils/PathExtractorContext.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.utils; + +import org.apache.atlas.model.instance.AtlasEntity; + +import java.util.HashMap; +import java.util.Map; + +public class PathExtractorContext { + private final String metadataNamespace; + private final Map<String, AtlasEntity> knownEntities; + private final boolean isConvertPathToLowerCase; + private final String awsS3AtlasModelVersion; + + public PathExtractorContext(String metadataNamespace) { + this(metadataNamespace, new HashMap<>(), false, null) ; + } + + public PathExtractorContext(String metadataNamespace, String awsS3AtlasModelVersion) { + this(metadataNamespace, new HashMap<>(), false, awsS3AtlasModelVersion) ; + } + + public PathExtractorContext(String metadataNamespace, boolean isConvertPathToLowerCase, String awsS3AtlasModelVersion) { + this(metadataNamespace, new HashMap<>(), isConvertPathToLowerCase, awsS3AtlasModelVersion) ; + } + + public PathExtractorContext(String metadataNamespace, Map<String, AtlasEntity> knownEntities, boolean isConvertPathToLowerCase, String awsS3AtlasModelVersion) { + this.metadataNamespace = metadataNamespace; + this.knownEntities = knownEntities; + this.isConvertPathToLowerCase = isConvertPathToLowerCase; + this.awsS3AtlasModelVersion = awsS3AtlasModelVersion; + } + + public String getMetadataNamespace() { + return metadataNamespace; + } + + public Map<String, AtlasEntity> getKnownEntities() { + return knownEntities; + } + + public void putEntity(String qualifiedName, AtlasEntity entity) { + knownEntities.put(qualifiedName, entity); + } + + public AtlasEntity getEntity(String qualifiedName) { + return knownEntities.get(qualifiedName); + } + + public boolean isConvertPathToLowerCase() { + return isConvertPathToLowerCase; + } + + public String getAwsS3AtlasModelVersion() { + return awsS3AtlasModelVersion; + } +} \ No newline at end of file diff --git a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java new file mode 100644 index 0000000..664bfb7 --- /dev/null +++ b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.utils; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; +import org.apache.hadoop.fs.Path; + +import java.util.HashMap; +import java.util.Map; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; + +public class AtlasPathExtractorUtilTest { + private static final Logger LOG = LoggerFactory.getLogger(AtlasPathExtractorUtilTest.class); + + // Common + private static final String METADATA_NAMESPACE = "metaspace"; + private static final String QNAME_METADATA_NAMESPACE = '@' + METADATA_NAMESPACE; + private static final String SCHEME_SEPARATOR = "://"; + private static final String ATTRIBUTE_NAME = "name"; + private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + + // HDFS + private static final String HDFS_PATH_TYPE = "hdfs_path"; + private static final String ATTRIBUTE_PATH = "path"; + private static final String ATTRIBUTE_CLUSTER_NAME = "clusterName"; + + // Ozone + private static final String OZONE_VOLUME = "ozone_volume"; + private static final String OZONE_BUCKET = "ozone_bucket"; + private static final String OZONE_KEY = "ozone_key"; + private static final String OZONE_SCHEME = "ofs" + SCHEME_SEPARATOR; + private static final String OZONE_3_SCHEME = "o3fs" + SCHEME_SEPARATOR; + private static final String OZONE_PATH = OZONE_SCHEME + "bucket1.volume1.ozone1/files/file.txt"; + private static final String OZONE_3_PATH = OZONE_3_SCHEME + "bucket1.volume1.ozone1/files/file.txt"; + + // HDFS + private static final String HDFS_SCHEME = "hdfs" + SCHEME_SEPARATOR; + private static final String HDFS_PATH = HDFS_SCHEME + "host_name:8020/warehouse/tablespace/external/hive/taBlE_306"; + + @Test + public void testGetPathEntityOzone3Path() { + PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE); + + Path path = new Path(OZONE_3_PATH); + AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext); + AtlasEntity entity = entityWithExtInfo.getEntity(); + + assertNotNull(entity); + assertEquals(entity.getTypeName(), OZONE_KEY); + verifyOzoneKeyEntity(OZONE_3_PATH, entity); + + assertEquals(entityWithExtInfo.getReferredEntities().size(), 2); + verifyOzoneEntities(OZONE_3_SCHEME, OZONE_3_PATH, extractorContext.getKnownEntities()); + + assertEquals(extractorContext.getKnownEntities().size(), 3); + verifyOzoneEntities(OZONE_3_SCHEME, OZONE_3_PATH, extractorContext.getKnownEntities()); + } + + @Test + public void testGetPathEntityOzonePath() { + PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE); + + Path path = new Path(OZONE_PATH); + AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext); + AtlasEntity entity = entityWithExtInfo.getEntity(); + + assertNotNull(entity); + assertEquals(entity.getTypeName(), OZONE_KEY); + verifyOzoneKeyEntity(OZONE_PATH, entity); + + assertEquals(entityWithExtInfo.getReferredEntities().size(), 2); + verifyOzoneEntities(OZONE_SCHEME, OZONE_PATH, extractorContext.getKnownEntities()); + + assertEquals(extractorContext.getKnownEntities().size(), 3); + verifyOzoneEntities(OZONE_SCHEME, OZONE_PATH, extractorContext.getKnownEntities()); + } + + @Test + public void testGetPathEntityHdfsPath() { + Map<String, AtlasEntity> knownEntities = new HashMap<>(); + AtlasEntityWithExtInfo extInfo = new AtlasEntityWithExtInfo(); + + PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE); + + Path path = new Path(HDFS_PATH); + AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext); + AtlasEntity entity = entityWithExtInfo.getEntity(); + + assertNotNull(entity); + assertEquals(entity.getTypeName(), HDFS_PATH_TYPE); + verifyHDFSEntity(entity, false); + + assertNull(extInfo.getReferredEntities()); + assertEquals(extractorContext.getKnownEntities().size(), 1); + extractorContext.getKnownEntities().values().forEach(x -> verifyHDFSEntity(x, false)); + } + + @Test + public void testGetPathEntityHdfsPathLowerCase() { + PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE, true, null); + + Path path = new Path(HDFS_PATH); + AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext); + AtlasEntity entity = entityWithExtInfo.getEntity(); + + assertNotNull(entity); + assertEquals(entity.getTypeName(), HDFS_PATH_TYPE); + verifyHDFSEntity(entity, true); + + assertNull(entityWithExtInfo.getReferredEntities()); + assertEquals(extractorContext.getKnownEntities().size(), 1); + extractorContext.getKnownEntities().values().forEach(x -> verifyHDFSEntity(x, true)); + } + + private void verifyOzoneEntities(String scheme, String path, Map<String, AtlasEntity> knownEntities) { + for (AtlasEntity knownEntity : knownEntities.values()) { + switch (knownEntity.getTypeName()){ + case OZONE_KEY: + verifyOzoneKeyEntity(path, knownEntity); + break; + + case OZONE_VOLUME: + assertEquals(knownEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), scheme + "volume1" + QNAME_METADATA_NAMESPACE); + assertEquals(knownEntity.getAttribute(ATTRIBUTE_NAME), "volume1"); + break; + + case OZONE_BUCKET: + assertEquals(knownEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), scheme + "volume1.bucket1" + QNAME_METADATA_NAMESPACE); + assertEquals(knownEntity.getAttribute(ATTRIBUTE_NAME), "bucket1"); + break; + } + } + } + + private void verifyOzoneKeyEntity(String path, AtlasEntity entity) { + assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), path + QNAME_METADATA_NAMESPACE); + assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "/files/file.txt"); + } + + private void verifyHDFSEntity(AtlasEntity entity, boolean toLowerCase) { + if (toLowerCase) { + assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), HDFS_PATH.toLowerCase() + QNAME_METADATA_NAMESPACE); + assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "/warehouse/tablespace/external/hive/table_306"); + assertEquals(entity.getAttribute(ATTRIBUTE_PATH), HDFS_PATH.toLowerCase()); + assertEquals(entity.getAttribute(ATTRIBUTE_CLUSTER_NAME), METADATA_NAMESPACE); + } else { + assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), HDFS_PATH + QNAME_METADATA_NAMESPACE); + assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "/warehouse/tablespace/external/hive/taBlE_306"); + assertEquals(entity.getAttribute(ATTRIBUTE_PATH), HDFS_PATH); + assertEquals(entity.getAttribute(ATTRIBUTE_CLUSTER_NAME), METADATA_NAMESPACE); + } + } +}