This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b50ac0  ATLAS-3836 Add Apache Ozone support in hive hook
8b50ac0 is described below

commit 8b50ac0c7d8eba8efa0deb97f1fee68d382f11ad
Author: Nikhil Bonte <nikhil.bo...@freestoneinfotech.com>
AuthorDate: Fri Jun 26 12:20:09 2020 +0530

    ATLAS-3836 Add Apache Ozone support in hive hook
    
    Signed-off-by: Sarath Subramanian <sar...@apache.org>
---
 .../atlas/hive/hook/AtlasHiveHookContext.java      |   5 +-
 .../java/org/apache/atlas/hive/hook/HiveHook.java  |   8 +-
 .../atlas/hive/hook/events/BaseHiveEvent.java      | 309 +------------
 .../apache/atlas/utils/AtlasPathExtractorUtil.java | 493 +++++++++++++++++++++
 .../apache/atlas/utils/PathExtractorContext.java   |  74 ++++
 .../atlas/utils/AtlasPathExtractorUtilTest.java    | 176 ++++++++
 6 files changed, 763 insertions(+), 302 deletions(-)

diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
index d0b9393..1286471 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
@@ -157,6 +157,7 @@ public class AtlasHiveHookContext {
 
     public Collection<AtlasEntity> getEntities() { return 
qNameEntityMap.values(); }
 
+    public Map<String, AtlasEntity> getQNameToEntityMap() { return 
qNameEntityMap; }
 
     public String getMetadataNamespace() {
         return hook.getMetadataNamespace();
@@ -168,8 +169,8 @@ public class AtlasHiveHookContext {
         return hook.isConvertHdfsPathToLowerCase();
     }
 
-    public boolean isAwsS3AtlasModelVersionV2() {
-        return hook.isAwsS3AtlasModelVersionV2();
+    public String getAwsS3AtlasModelVersion() {
+        return hook.getAwsS3AtlasModelVersion();
     }
 
     public boolean getSkipHiveColumnLineageHive20633() {
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 3aa5c3b..6513234 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -77,7 +77,7 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
     private static final int     nameCacheDatabaseMaxCount;
     private static final int     nameCacheTableMaxCount;
     private static final int     nameCacheRebuildIntervalSeconds;
-    private static final boolean isAwsS3AtlasModelVersionV2;
+    private static final String  awsS3AtlasModelVersion;
 
     private static final boolean                       
skipHiveColumnLineageHive20633;
     private static final int                           
skipHiveColumnLineageHive20633InputsThreshold;
@@ -101,7 +101,7 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         nameCacheDatabaseMaxCount       = 
atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000);
         nameCacheTableMaxCount          = 
atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000);
         nameCacheRebuildIntervalSeconds = 
atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 
minutes default
-        isAwsS3AtlasModelVersionV2      = 
StringUtils.equalsIgnoreCase(atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION,
 HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2), HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
+        awsS3AtlasModelVersion          = 
atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION, 
HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
         skipHiveColumnLineageHive20633                = 
atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
         skipHiveColumnLineageHive20633InputsThreshold = 
atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 15); // skip if avg # of inputs is > 15
         hiveProcessPopulateDeprecatedAttributes       = 
atlasProperties.getBoolean(HOOK_HIVE_PROCESS_POPULATE_DEPRECATED_ATTRIBUTES, 
false);
@@ -257,7 +257,9 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         return convertHdfsPathToLowerCase;
     }
 
-    public boolean isAwsS3AtlasModelVersionV2() { return 
isAwsS3AtlasModelVersionV2; }
+    public String getAwsS3AtlasModelVersion() {
+        return awsS3AtlasModelVersion;
+    }
 
     public boolean getSkipHiveColumnLineageHive20633() {
         return skipHiveColumnLineageHive20633;
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index 6986ba1..2b809a6 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -20,6 +20,7 @@ package org.apache.atlas.hive.hook.events;
 
 import org.apache.atlas.hive.hook.AtlasHiveHookContext;
 import org.apache.atlas.hive.hook.HiveHook.PreprocessAction;
+import org.apache.atlas.utils.PathExtractorContext;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
@@ -29,6 +30,7 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.instance.AtlasStruct;
 import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AtlasPathExtractorUtil;
 import org.apache.atlas.utils.HdfsNameServiceResolver;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
@@ -73,25 +75,8 @@ public abstract class BaseHiveEvent {
     public static final String HIVE_TYPE_PROCESS_EXECUTION         = 
"hive_process_execution";
     public static final String HIVE_DB_DDL                         = 
"hive_db_ddl";
     public static final String HIVE_TABLE_DDL                      = 
"hive_table_ddl";
-    public static final String HDFS_TYPE_PATH                      = 
"hdfs_path";
     public static final String HBASE_TYPE_TABLE                    = 
"hbase_table";
     public static final String HBASE_TYPE_NAMESPACE                = 
"hbase_namespace";
-    public static final String AWS_S3_BUCKET                       = 
"aws_s3_bucket";
-    public static final String AWS_S3_PSEUDO_DIR                   = 
"aws_s3_pseudo_dir";
-    public static final String AWS_S3_OBJECT                       = 
"aws_s3_object";
-    public static final String AWS_S3_V2_BUCKET                    = 
"aws_s3_v2_bucket";
-    public static final String AWS_S3_V2_PSEUDO_DIR                = 
"aws_s3_v2_directory";
-    public static final String ADLS_GEN2_ACCOUNT                   = 
"adls_gen2_account";
-    public static final String ADLS_GEN2_CONTAINER                 = 
"adls_gen2_container";
-    public static final String ADLS_GEN2_DIRECTORY                 = 
"adls_gen2_directory";
-    public static final String ADLS_GEN2_ACCOUNT_HOST_SUFFIX       = 
".dfs.core.windows.net";
-
-    public static final String SCHEME_SEPARATOR                    = "://";
-    public static final String S3_SCHEME                           = "s3" + 
SCHEME_SEPARATOR;
-    public static final String S3A_SCHEME                          = "s3a" + 
SCHEME_SEPARATOR;
-    public static final String ABFS_SCHEME                         = "abfs" + 
SCHEME_SEPARATOR;
-    public static final String ABFSS_SCHEME                        = "abfss" + 
SCHEME_SEPARATOR;
-
     public static final String ATTRIBUTE_QUALIFIED_NAME            = 
"qualifiedName";
     public static final String ATTRIBUTE_NAME                      = "name";
     public static final String ATTRIBUTE_DESCRIPTION               = 
"description";
@@ -145,16 +130,10 @@ public abstract class BaseHiveEvent {
     public static final String ATTRIBUTE_URI                       = "uri";
     public static final String ATTRIBUTE_STORAGE_HANDLER           = 
"storage_handler";
     public static final String ATTRIBUTE_NAMESPACE                 = 
"namespace";
-    public static final String ATTRIBUTE_OBJECT_PREFIX             = 
"objectPrefix";
-    public static final String ATTRIBUTE_BUCKET                    = "bucket";
-    public static final String ATTRIBUTE_CONTAINER                 = 
"container";
     public static final String ATTRIBUTE_HOSTNAME                  = 
"hostName";
     public static final String ATTRIBUTE_EXEC_TIME                 = 
"execTime";
     public static final String ATTRIBUTE_DDL_QUERIES               = 
"ddlQueries";
     public static final String ATTRIBUTE_SERVICE_TYPE              = 
"serviceType";
-    public static final String ATTRIBUTE_ACCOUNT                   = "account";
-    public static final String ATTRIBUTE_PARENT                    = "parent";
-
     public static final String HBASE_STORAGE_HANDLER_CLASS         = 
"org.apache.hadoop.hive.hbase.HBaseStorageHandler";
     public static final String HBASE_DEFAULT_NAMESPACE             = "default";
     public static final String HBASE_NAMESPACE_TABLE_DELIMITER     = ":";
@@ -170,14 +149,10 @@ public abstract class BaseHiveEvent {
     public static final String RELATIONSHIP_HIVE_TABLE_PART_KEYS          = 
"hive_table_partitionkeys";
     public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS            = 
"hive_table_columns";
     public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC       = 
"hive_table_storagedesc";
-    public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS  = 
"aws_s3_bucket_aws_s3_pseudo_dirs";
-    public static final String RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED = 
"aws_s3_v2_container_contained";
     public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE      = 
"hive_process_process_executions";
     public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES           = 
"hive_db_ddl_queries";
     public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES        = 
"hive_table_ddl_queries";
     public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE         = 
"hbase_table_namespace";
-    public static final String RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS  = 
"adls_gen2_account_containers";
-    public static final String RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN     = 
"adls_gen2_parent_children";
 
 
     public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new 
HashMap<>();
@@ -584,52 +559,21 @@ public abstract class BaseHiveEvent {
     }
 
     protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo) 
{
-            AtlasEntity ret;
-        String      strPath           = path.toString();
-        String      metadataNamespace = getMetadataNamespace();
-
-        if (strPath.startsWith(HDFS_PATH_PREFIX) && 
context.isConvertHdfsPathToLowerCase()) {
-            strPath = strPath.toLowerCase();
-        }
-
-        if (isS3Path(strPath)) {
-            if (context.isAwsS3AtlasModelVersionV2()) {
-                ret = addS3PathEntityV2(path, strPath, extInfo);
-            } else {
-                ret = addS3PathEntityV1(path, strPath, extInfo);
-            }
-        } else if (isAbfsPath(strPath)) {
-            ret = addAbfsPathEntity(path, strPath, extInfo);
-        } else {
-            String nameServiceID     = 
HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
-            String attrPath          = StringUtils.isEmpty(nameServiceID) ? 
strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
-            String pathQualifiedName = getQualifiedName(attrPath);
-
-            ret = context.getEntity(pathQualifiedName);
-
-            if (ret == null) {
-                ret = new AtlasEntity(HDFS_TYPE_PATH);
-
-                if (StringUtils.isNotEmpty(nameServiceID)) {
-                    ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
-                }
-
-                String name = 
Path.getPathWithoutSchemeAndAuthority(path).toString();
+        String               strPath                  = path.toString();
+        String               metadataNamespace        = getMetadataNamespace();
+        boolean              isConvertPathToLowerCase = 
strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase();
+        PathExtractorContext pathExtractorContext     = new 
PathExtractorContext(metadataNamespace, context.getQNameToEntityMap(),
+                                                                               
  isConvertPathToLowerCase, context.getAwsS3AtlasModelVersion());
 
-                if (strPath.startsWith(HDFS_PATH_PREFIX) && 
context.isConvertHdfsPathToLowerCase()) {
-                    name = name.toLowerCase();
-                }
-
-                ret.setAttribute(ATTRIBUTE_PATH, attrPath);
-                ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
-                ret.setAttribute(ATTRIBUTE_NAME, name);
-                ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
+        AtlasEntityWithExtInfo entityWithExtInfo = 
AtlasPathExtractorUtil.getPathEntity(path, pathExtractorContext);
 
-                context.putEntity(pathQualifiedName, ret);
+        if (entityWithExtInfo.getReferredEntities() != null){
+            for (AtlasEntity entity : 
entityWithExtInfo.getReferredEntities().values()) {
+                extInfo.addReferredEntity(entity);
             }
         }
 
-        return ret;
+        return entityWithExtInfo.getEntity();
     }
 
     protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs, 
List<AtlasEntity> outputs) throws Exception {
@@ -1145,235 +1089,6 @@ public abstract class BaseHiveEvent {
         return false;
     }
 
-    private boolean isS3Path(String strPath) {
-        return strPath != null && (strPath.startsWith(S3_SCHEME) || 
strPath.startsWith(S3A_SCHEME));
-    }
-
-    private boolean isAbfsPath(String strPath) {
-        return strPath != null && (strPath.startsWith(ABFS_SCHEME) || 
strPath.startsWith(ABFSS_SCHEME));
-    }
-
-    private AtlasEntity addS3PathEntityV1(Path path, String strPath, 
AtlasEntityExtInfo extInfo) {
-        String      metadataNamespace   = getMetadataNamespace();
-        String      bucketName          = path.toUri().getAuthority();
-        String      bucketQualifiedName = (path.toUri().getScheme() + 
SCHEME_SEPARATOR + path.toUri().getAuthority() + 
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
-        String      pathQualifiedName   = (strPath + 
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
-        AtlasEntity bucketEntity        = 
context.getEntity(bucketQualifiedName);
-        AtlasEntity ret                 = context.getEntity(pathQualifiedName);
-
-        if (ret == null) {
-            if (bucketEntity == null) {
-                bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
-
-                bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
bucketQualifiedName);
-                bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
-
-                context.putEntity(bucketQualifiedName, bucketEntity);
-            }
-
-            extInfo.addReferredEntity(bucketEntity);
-
-            ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
-
-            ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, 
AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, 
RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
-            ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, 
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
-            ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
-            ret.setAttribute(ATTRIBUTE_NAME, 
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
-
-            context.putEntity(pathQualifiedName, ret);
-        }
-
-        return ret;
-    }
-
-    private AtlasEntity addS3PathEntityV2(Path path, String strPath, 
AtlasEntityExtInfo extInfo) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> addS3PathEntityV2(strPath={})", strPath);
-        }
-
-        String      metadataNamespace = getMetadataNamespace();
-        String      pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE 
+ metadataNamespace;
-        AtlasEntity ret               = context.getEntity(pathQualifiedName);
-
-        if (ret == null) {
-            String      bucketName          = path.toUri().getAuthority();
-            String      schemeAndBucketName = (path.toUri().getScheme() + 
SCHEME_SEPARATOR + bucketName).toLowerCase();
-            String      bucketQualifiedName = schemeAndBucketName + 
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
-            AtlasEntity bucketEntity        = 
context.getEntity(bucketQualifiedName);
-
-            if (bucketEntity == null) {
-                bucketEntity = new AtlasEntity(AWS_S3_V2_BUCKET);
-
-                bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
bucketQualifiedName);
-                bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
bucketEntity.getTypeName(), 
bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
-                }
-
-                context.putEntity(bucketQualifiedName, bucketEntity);
-            }
-
-            extInfo.addReferredEntity(bucketEntity);
-
-            AtlasRelatedObjectId parentObjId = 
AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, 
RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
-            String               parentPath  = Path.SEPARATOR;
-            String               dirPath     = path.toUri().getPath();
-
-            if (StringUtils.isEmpty(dirPath)) {
-                dirPath = Path.SEPARATOR;
-            }
-
-            for (String subDirName : dirPath.split(Path.SEPARATOR)) {
-                if (StringUtils.isEmpty(subDirName)) {
-                    continue;
-                }
-
-                String subDirPath          = parentPath + subDirName + 
Path.SEPARATOR;
-                String subDirQualifiedName = schemeAndBucketName + subDirPath 
+ QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
-
-                ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR);
-
-                ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId);
-                ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath);
-                ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
subDirQualifiedName);
-                ret.setAttribute(ATTRIBUTE_NAME, subDirName);
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
-                }
-
-                context.putEntity(subDirQualifiedName, ret);
-
-                parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, 
RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
-                parentPath  = subDirPath;
-            }
-
-            if (ret == null) {
-                ret = bucketEntity;
-            }
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== addS3PathEntityV2(strPath={})", strPath);
-        }
-
-        return ret;
-    }
-
-    private AtlasEntity addAbfsPathEntity(Path path, String strPath, 
AtlasEntityExtInfo extInfo) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> addAbfsPathEntity(strPath={})", strPath);
-        }
-
-        String      metadataNamespace = getMetadataNamespace();
-        String      pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE 
+ metadataNamespace;
-        AtlasEntity ret               = context.getEntity(pathQualifiedName);
-
-        if (ret == null) {
-            String      abfsScheme               = path.toUri().getScheme();
-            String      storageAcctName          = 
getAbfsStorageAccountName(path.toUri());
-            String      schemeAndStorageAcctName = (abfsScheme + 
SCHEME_SEPARATOR + storageAcctName).toLowerCase();
-            String      storageAcctQualifiedName = schemeAndStorageAcctName + 
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
-            AtlasEntity storageAcctEntity        = 
context.getEntity(storageAcctQualifiedName);
-
-            // create adls-gen2 storage-account entity
-            if (storageAcctEntity == null) {
-                storageAcctEntity = new AtlasEntity(ADLS_GEN2_ACCOUNT);
-
-                storageAcctEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
storageAcctQualifiedName);
-                storageAcctEntity.setAttribute(ATTRIBUTE_NAME, 
storageAcctName);
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
storageAcctEntity.getTypeName(), 
storageAcctEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
-                }
-
-                context.putEntity(storageAcctQualifiedName, storageAcctEntity);
-            }
-
-            extInfo.addReferredEntity(storageAcctEntity);
-
-            AtlasRelatedObjectId storageAcctObjId = 
AtlasTypeUtil.getAtlasRelatedObjectId(storageAcctEntity, 
RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS);
-
-            // create adls-gen2 container entity linking to storage account
-            String      containerName          = path.toUri().getUserInfo();
-            String      schemeAndContainerName = (abfsScheme + 
SCHEME_SEPARATOR + containerName + QNAME_SEP_METADATA_NAMESPACE + 
storageAcctName).toLowerCase();
-            String      containerQualifiedName = schemeAndContainerName + 
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
-            AtlasEntity containerEntity        = 
context.getEntity(containerQualifiedName);
-
-            if (containerEntity == null) {
-                containerEntity = new AtlasEntity(ADLS_GEN2_CONTAINER);
-
-                containerEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
containerQualifiedName);
-                containerEntity.setAttribute(ATTRIBUTE_NAME, containerName);
-                containerEntity.setRelationshipAttribute(ATTRIBUTE_ACCOUNT, 
storageAcctObjId);
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
containerEntity.getTypeName(), 
containerEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
-                }
-
-                context.putEntity(containerQualifiedName, containerEntity);
-            }
-
-            extInfo.addReferredEntity(containerEntity);
-
-            // create adls-gen2 directory entity linking to container
-            AtlasRelatedObjectId parentObjId = 
AtlasTypeUtil.getAtlasRelatedObjectId(containerEntity, 
RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
-            String               parentPath  = Path.SEPARATOR;
-            String               dirPath     = path.toUri().getPath();
-
-            if (StringUtils.isEmpty(dirPath)) {
-                dirPath = Path.SEPARATOR;
-            }
-
-            for (String subDirName : dirPath.split(Path.SEPARATOR)) {
-                if (StringUtils.isEmpty(subDirName)) {
-                    continue;
-                }
-
-                String subDirPath          = parentPath + subDirName + 
Path.SEPARATOR;
-                String subDirQualifiedName = schemeAndContainerName + 
subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
-
-                ret = new AtlasEntity(ADLS_GEN2_DIRECTORY);
-
-                ret.setRelationshipAttribute(ATTRIBUTE_PARENT, parentObjId);
-                ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
subDirQualifiedName);
-                ret.setAttribute(ATTRIBUTE_NAME, subDirName);
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
-                }
-
-                context.putEntity(subDirQualifiedName, ret);
-
-                parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, 
RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
-                parentPath  = subDirPath;
-            }
-
-            if (ret == null) {
-                ret = storageAcctEntity;
-            }
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== addAbfsPathEntity(strPath={})", strPath);
-        }
-
-        return ret;
-    }
-
-    private String getAbfsStorageAccountName(URI uri) {
-        String ret  = null;
-        String host = uri.getHost();
-
-        // host: "<account_name>.dfs.core.windows.net"
-        if (StringUtils.isNotEmpty(host) && 
host.contains(ADLS_GEN2_ACCOUNT_HOST_SUFFIX)) {
-            ret = host.substring(0, 
host.indexOf(ADLS_GEN2_ACCOUNT_HOST_SUFFIX));
-        }
-
-        return ret;
-    }
-
     static final class EntityComparator implements Comparator<Entity> {
         @Override
         public int compare(Entity entity1, Entity entity2) {
diff --git 
a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java 
b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
new file mode 100644
index 0000000..c3276a8
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.utils;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+public class AtlasPathExtractorUtil {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasPathExtractorUtil.class);
+
+    // Common
+    public static final char   QNAME_SEP_METADATA_NAMESPACE = '@';
+    public static final char   QNAME_SEP_ENTITY_NAME        = '.';
+    public static final String SCHEME_SEPARATOR             = "://";
+    public static final String ATTRIBUTE_QUALIFIED_NAME     = "qualifiedName";
+    public static final String ATTRIBUTE_NAME               = "name";
+    public static final String ATTRIBUTE_BUCKET             = "bucket";
+
+    // HDFS
+    public static final String HDFS_TYPE_PATH           = "hdfs_path";
+    public static final String ATTRIBUTE_PATH           = "path";
+    public static final String ATTRIBUTE_CLUSTER_NAME   = "clusterName";
+    public static final String ATTRIBUTE_NAMESERVICE_ID = "nameServiceId";
+
+    // AWS S3
+    public static final String AWS_S3_ATLAS_MODEL_VERSION_V2              = 
"v2";
+    public static final String AWS_S3_BUCKET                              = 
"aws_s3_bucket";
+    public static final String AWS_S3_PSEUDO_DIR                          = 
"aws_s3_pseudo_dir";
+    public static final String AWS_S3_V2_BUCKET                           = 
"aws_s3_v2_bucket";
+    public static final String AWS_S3_V2_PSEUDO_DIR                       = 
"aws_s3_v2_directory";
+    public static final String S3_SCHEME                                  = 
"s3" + SCHEME_SEPARATOR;
+    public static final String S3A_SCHEME                                 = 
"s3a" + SCHEME_SEPARATOR;
+    public static final String ATTRIBUTE_CONTAINER                        = 
"container";
+    public static final String ATTRIBUTE_OBJECT_PREFIX                    = 
"objectPrefix";
+    public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS  = 
"aws_s3_bucket_aws_s3_pseudo_dirs";
+    public static final String RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED = 
"aws_s3_v2_container_contained";
+
+    // ADLS Gen2
+    public static final String ADLS_GEN2_ACCOUNT                          = 
"adls_gen2_account";
+    public static final String ADLS_GEN2_CONTAINER                        = 
"adls_gen2_container";
+    public static final String ADLS_GEN2_DIRECTORY                        = 
"adls_gen2_directory";
+    public static final String ADLS_GEN2_ACCOUNT_HOST_SUFFIX              = 
".dfs.core.windows.net";
+    public static final String ABFS_SCHEME                                = 
"abfs" + SCHEME_SEPARATOR;
+    public static final String ABFSS_SCHEME                               = 
"abfss" + SCHEME_SEPARATOR;
+    public static final String ATTRIBUTE_ACCOUNT                          = 
"account";
+    public static final String ATTRIBUTE_PARENT                           = 
"parent";
+    public static final String RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS  = 
"adls_gen2_account_containers";
+    public static final String RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN     = 
"adls_gen2_parent_children";
+
+    // Ozone
+    public static final String OZONE_VOLUME                     = 
"ozone_volume";
+    public static final String OZONE_BUCKET                     = 
"ozone_bucket";
+    public static final String OZONE_KEY                        = "ozone_key";
+    public static final String OZONE_SCHEME                     = "ofs" + 
SCHEME_SEPARATOR;
+    public static final String OZONE_3_SCHEME                   = "o3fs" + 
SCHEME_SEPARATOR;
+    public static final String ATTRIBUTE_VOLUME                 = "volume";
+    public static final String RELATIONSHIP_OZONE_VOLUME_BUCKET = 
"ozone_volume_buckets";
+    public static final String RELATIONSHIP_OZONE_BUCKET_KEY    = 
"ozone_bucket_keys";
+
+    public static AtlasEntityWithExtInfo getPathEntity(Path path, 
PathExtractorContext context) {
+        AtlasEntityWithExtInfo entityWithExtInfo = new 
AtlasEntityWithExtInfo();
+        AtlasEntity ret;
+        String      strPath = path.toString();
+
+        if (context.isConvertPathToLowerCase()) {
+            strPath = strPath.toLowerCase();
+        }
+
+        if (isS3Path(strPath)) {
+            ret = isAwsS3AtlasModelVersionV2(context) ? 
addS3PathEntityV2(path, entityWithExtInfo, context) :
+                                                        
addS3PathEntityV1(path, entityWithExtInfo, context);
+        } else if (isAbfsPath(strPath)) {
+            ret = addAbfsPathEntity(path, entityWithExtInfo, context);
+        } else if (isOzonePath(strPath)) {
+            ret = addOzonePathEntity(path, entityWithExtInfo, context);
+        } else {
+            ret = addHDFSPathEntity(path, context);
+        }
+        entityWithExtInfo.setEntity(ret);
+
+        return entityWithExtInfo;
+    }
+
+    private static boolean isAwsS3AtlasModelVersionV2(PathExtractorContext 
context) {
+        return StringUtils.isNotEmpty(context.getAwsS3AtlasModelVersion()) &&
+               
StringUtils.equalsIgnoreCase(context.getAwsS3AtlasModelVersion(), 
AWS_S3_ATLAS_MODEL_VERSION_V2);
+    }
+
+    private static boolean isS3Path(String strPath) {
+        return strPath != null && (strPath.startsWith(S3_SCHEME) || 
strPath.startsWith(S3A_SCHEME));
+    }
+
+    private static boolean isAbfsPath(String strPath) {
+        return strPath != null && (strPath.startsWith(ABFS_SCHEME) || 
strPath.startsWith(ABFSS_SCHEME));
+    }
+
+    private static boolean isOzonePath(String strPath) {
+        return strPath != null && (strPath.startsWith(OZONE_SCHEME) || 
strPath.startsWith(OZONE_3_SCHEME));
+    }
+
+    private static AtlasEntity addS3PathEntityV1(Path path, AtlasEntityExtInfo 
extInfo, PathExtractorContext context) {
+        String strPath = path.toString();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> addS3PathEntityV1(strPath={})", strPath);
+        }
+
+        String      metadataNamespace   = context.getMetadataNamespace();
+        String      bucketName          = path.toUri().getAuthority();
+        String      bucketQualifiedName = (path.toUri().getScheme() + 
SCHEME_SEPARATOR + path.toUri().getAuthority() + 
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
+        String      pathQualifiedName   = (strPath + 
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
+        AtlasEntity bucketEntity        = 
context.getEntity(bucketQualifiedName);
+        AtlasEntity ret                 = context.getEntity(pathQualifiedName);
+
+        if (ret == null) {
+            if (bucketEntity == null) {
+                bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
+
+                bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
bucketQualifiedName);
+                bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
+
+                context.putEntity(bucketQualifiedName, bucketEntity);
+            }
+
+            extInfo.addReferredEntity(bucketEntity);
+
+            ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
+
+            ret.setRelationshipAttribute(ATTRIBUTE_BUCKET, 
AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, 
RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
+            ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, 
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+            ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
+            ret.setAttribute(ATTRIBUTE_NAME, 
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+
+            context.putEntity(pathQualifiedName, ret);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== addS3PathEntityV1(strPath={})", strPath);
+        }
+
+        return ret;
+    }
+
+    private static AtlasEntity addS3PathEntityV2(Path path, AtlasEntityExtInfo 
extInfo, PathExtractorContext context) {
+        String strPath = path.toString();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> addS3PathEntityV2(strPath={})", strPath);
+        }
+
+        String      metadataNamespace = context.getMetadataNamespace();
+        String      pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE 
+ metadataNamespace;
+        AtlasEntity ret               = context.getEntity(pathQualifiedName);
+
+        if (ret == null) {
+            String      bucketName          = path.toUri().getAuthority();
+            String      schemeAndBucketName = (path.toUri().getScheme() + 
SCHEME_SEPARATOR + bucketName).toLowerCase();
+            String      bucketQualifiedName = schemeAndBucketName + 
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+            AtlasEntity bucketEntity        = 
context.getEntity(bucketQualifiedName);
+
+            if (bucketEntity == null) {
+                bucketEntity = new AtlasEntity(AWS_S3_V2_BUCKET);
+
+                bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
bucketQualifiedName);
+                bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
bucketEntity.getTypeName(), 
bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                }
+
+                context.putEntity(bucketQualifiedName, bucketEntity);
+            }
+
+            extInfo.addReferredEntity(bucketEntity);
+
+            AtlasRelatedObjectId parentObjId = 
AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, 
RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
+            String               parentPath  = Path.SEPARATOR;
+            String               dirPath     = path.toUri().getPath();
+
+            if (StringUtils.isEmpty(dirPath)) {
+                dirPath = Path.SEPARATOR;
+            }
+
+            for (String subDirName : dirPath.split(Path.SEPARATOR)) {
+                if (StringUtils.isEmpty(subDirName)) {
+                    continue;
+                }
+
+                String subDirPath          = parentPath + subDirName + 
Path.SEPARATOR;
+                String subDirQualifiedName = schemeAndBucketName + subDirPath 
+ QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+
+                ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR);
+
+                ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId);
+                ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath);
+                ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
subDirQualifiedName);
+                ret.setAttribute(ATTRIBUTE_NAME, subDirName);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                }
+
+                context.putEntity(subDirQualifiedName, ret);
+
+                parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, 
RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
+                parentPath  = subDirPath;
+            }
+
+            if (ret == null) {
+                ret = bucketEntity;
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== addS3PathEntityV2(strPath={})", strPath);
+        }
+
+        return ret;
+    }
+
+    private static AtlasEntity addAbfsPathEntity(Path path, AtlasEntityExtInfo 
extInfo, PathExtractorContext context) {
+        String strPath = path.toString();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> addAbfsPathEntity(strPath={})", strPath);
+        }
+
+        String      metadataNamespace = context.getMetadataNamespace();
+        String      pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE 
+ metadataNamespace;
+        AtlasEntity ret               = context.getEntity(pathQualifiedName);
+
+        if (ret == null) {
+            String      abfsScheme               = path.toUri().getScheme();
+            String      storageAcctName          = 
getAbfsStorageAccountName(path.toUri());
+            String      schemeAndStorageAcctName = (abfsScheme + 
SCHEME_SEPARATOR + storageAcctName).toLowerCase();
+            String      storageAcctQualifiedName = schemeAndStorageAcctName + 
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+            AtlasEntity storageAcctEntity        = 
context.getEntity(storageAcctQualifiedName);
+
+            // create adls-gen2 storage-account entity
+            if (storageAcctEntity == null) {
+                storageAcctEntity = new AtlasEntity(ADLS_GEN2_ACCOUNT);
+
+                storageAcctEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
storageAcctQualifiedName);
+                storageAcctEntity.setAttribute(ATTRIBUTE_NAME, 
storageAcctName);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
storageAcctEntity.getTypeName(), 
storageAcctEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                }
+
+                context.putEntity(storageAcctQualifiedName, storageAcctEntity);
+            }
+
+            extInfo.addReferredEntity(storageAcctEntity);
+
+            AtlasRelatedObjectId storageAcctObjId = 
AtlasTypeUtil.getAtlasRelatedObjectId(storageAcctEntity, 
RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS);
+
+            // create adls-gen2 container entity linking to storage account
+            String      containerName          = path.toUri().getUserInfo();
+            String      schemeAndContainerName = (abfsScheme + 
SCHEME_SEPARATOR + containerName + QNAME_SEP_METADATA_NAMESPACE + 
storageAcctName).toLowerCase();
+            String      containerQualifiedName = schemeAndContainerName + 
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+            AtlasEntity containerEntity        = 
context.getEntity(containerQualifiedName);
+
+            if (containerEntity == null) {
+                containerEntity = new AtlasEntity(ADLS_GEN2_CONTAINER);
+
+                containerEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
containerQualifiedName);
+                containerEntity.setAttribute(ATTRIBUTE_NAME, containerName);
+                containerEntity.setRelationshipAttribute(ATTRIBUTE_ACCOUNT, 
storageAcctObjId);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
containerEntity.getTypeName(), 
containerEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                }
+
+                context.putEntity(containerQualifiedName, containerEntity);
+            }
+
+            extInfo.addReferredEntity(containerEntity);
+
+            // create adls-gen2 directory entity linking to container
+            AtlasRelatedObjectId parentObjId = 
AtlasTypeUtil.getAtlasRelatedObjectId(containerEntity, 
RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
+            String               parentPath  = Path.SEPARATOR;
+            String               dirPath     = path.toUri().getPath();
+
+            if (StringUtils.isEmpty(dirPath)) {
+                dirPath = Path.SEPARATOR;
+            }
+
+            for (String subDirName : dirPath.split(Path.SEPARATOR)) {
+                if (StringUtils.isEmpty(subDirName)) {
+                    continue;
+                }
+
+                String subDirPath          = parentPath + subDirName + 
Path.SEPARATOR;
+                String subDirQualifiedName = schemeAndContainerName + 
subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+
+                ret = new AtlasEntity(ADLS_GEN2_DIRECTORY);
+
+                ret.setRelationshipAttribute(ATTRIBUTE_PARENT, parentObjId);
+                ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
subDirQualifiedName);
+                ret.setAttribute(ATTRIBUTE_NAME, subDirName);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                }
+
+                context.putEntity(subDirQualifiedName, ret);
+
+                parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, 
RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
+                parentPath  = subDirPath;
+            }
+
+            if (ret == null) {
+                ret = storageAcctEntity;
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== addAbfsPathEntity(strPath={})", strPath);
+        }
+
+        return ret;
+    }
+
+    private static AtlasEntity addOzonePathEntity(Path path, 
AtlasEntityExtInfo extInfo, PathExtractorContext context) {
+        String strPath = path.toString();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> addOzonePathEntity(strPath={})", strPath);
+        }
+
+        String      metadataNamespace = context.getMetadataNamespace();
+        String      ozoneScheme       = path.toUri().getScheme();
+        String      pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE 
+ metadataNamespace;
+        AtlasEntity ret               = context.getEntity(pathQualifiedName);
+
+        if (ret == null) {
+            //create ozone volume entity
+            String      volumeName          = getOzoneVolumeName(path);
+            String      volumeQualifiedName = ozoneScheme + SCHEME_SEPARATOR + 
volumeName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+            AtlasEntity volumeEntity        = 
context.getEntity(volumeQualifiedName);
+
+            if (volumeEntity == null) {
+                volumeEntity = new AtlasEntity(OZONE_VOLUME);
+
+                volumeEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
volumeQualifiedName);
+                volumeEntity.setAttribute(ATTRIBUTE_NAME, volumeName);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
volumeEntity.getTypeName(), 
volumeEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                }
+
+                context.putEntity(volumeQualifiedName, volumeEntity);
+            }
+
+            extInfo.addReferredEntity(volumeEntity);
+
+            //create ozone bucket entity
+            String      bucketName          = getOzoneBucketName(path);
+            String      bucketQualifiedName = ozoneScheme + SCHEME_SEPARATOR + 
volumeName + QNAME_SEP_ENTITY_NAME + bucketName + QNAME_SEP_METADATA_NAMESPACE 
+ metadataNamespace;
+            AtlasEntity bucketEntity        = 
context.getEntity(bucketQualifiedName);
+
+            if (bucketEntity == null) {
+                bucketEntity = new AtlasEntity(OZONE_BUCKET);
+
+                bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
bucketQualifiedName);
+                bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
+                bucketEntity.setRelationshipAttribute( ATTRIBUTE_VOLUME, 
AtlasTypeUtil.getAtlasRelatedObjectId(volumeEntity, 
RELATIONSHIP_OZONE_VOLUME_BUCKET));
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("adding entity: typeName={}, qualifiedName={}", 
bucketEntity.getTypeName(), 
bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+                }
+
+                context.putEntity(bucketQualifiedName, bucketEntity);
+            }
+
+            extInfo.addReferredEntity(bucketEntity);
+
+            ret = new AtlasEntity(OZONE_KEY);
+
+            ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
+            ret.setAttribute(ATTRIBUTE_NAME, path.toUri().getPath());
+            ret.setRelationshipAttribute( ATTRIBUTE_BUCKET, 
AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, 
RELATIONSHIP_OZONE_BUCKET_KEY));
+
+            context.putEntity(pathQualifiedName, ret);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== addOzonePathEntity(strPath={})", strPath);
+        }
+
+        return ret;
+    }
+
+    private static AtlasEntity addHDFSPathEntity(Path path, 
PathExtractorContext context) {
+        String strPath = path.toString();
+
+        if (context.isConvertPathToLowerCase()) {
+            strPath = strPath.toLowerCase();
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> addHDFSPathEntity(strPath={})", strPath);
+        }
+
+        String      nameServiceID     = 
HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
+        String      attrPath          = StringUtils.isEmpty(nameServiceID) ? 
strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
+        String      pathQualifiedName = getQualifiedName(attrPath, 
context.getMetadataNamespace());
+        AtlasEntity ret               = context.getEntity(pathQualifiedName);
+
+        if (ret == null) {
+            ret = new AtlasEntity(HDFS_TYPE_PATH);
+
+            if (StringUtils.isNotEmpty(nameServiceID)) {
+                ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
+            }
+
+            String name = 
Path.getPathWithoutSchemeAndAuthority(path).toString();
+
+            if (context.isConvertPathToLowerCase()) {
+                name = name.toLowerCase();
+            }
+
+            ret.setAttribute(ATTRIBUTE_PATH, attrPath);
+            ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
+            ret.setAttribute(ATTRIBUTE_NAME, name);
+            ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, 
context.getMetadataNamespace());
+
+            context.putEntity(pathQualifiedName, ret);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== addHDFSPathEntity(strPath={})", strPath);
+        }
+
+        return ret;
+    }
+
+    private static String getAbfsStorageAccountName(URI uri) {
+        String ret  = null;
+        String host = uri.getHost();
+
+        // host: "<account_name>.dfs.core.windows.net"
+        if (StringUtils.isNotEmpty(host) && 
host.contains(ADLS_GEN2_ACCOUNT_HOST_SUFFIX)) {
+            ret = host.substring(0, 
host.indexOf(ADLS_GEN2_ACCOUNT_HOST_SUFFIX));
+        }
+
+        return ret;
+    }
+
+    private static String getOzoneVolumeName(Path path) {
+        String pathAuthority = path.toUri().getAuthority();
+        // pathAuthority: "<bucket_name>.<volume_name>.<ozone.service.id>"
+        return pathAuthority.split("\\.")[1];
+    }
+
+    private static String getOzoneBucketName(Path path) {
+        String pathAuthority = path.toUri().getAuthority();
+        return pathAuthority.split("\\.")[0];
+    }
+
+    private static String getQualifiedName(String path, String 
metadataNamespace) {
+        if (path.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) {
+            return path + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+        }
+
+        return path.toLowerCase();
+    }
+}
\ No newline at end of file
diff --git 
a/common/src/main/java/org/apache/atlas/utils/PathExtractorContext.java 
b/common/src/main/java/org/apache/atlas/utils/PathExtractorContext.java
new file mode 100644
index 0000000..ce688e4
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/utils/PathExtractorContext.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.utils;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PathExtractorContext {
+    private final String                   metadataNamespace;
+    private final Map<String, AtlasEntity> knownEntities;
+    private final boolean                  isConvertPathToLowerCase;
+    private final String                   awsS3AtlasModelVersion;
+
+    public PathExtractorContext(String metadataNamespace) {
+        this(metadataNamespace, new HashMap<>(), false, null) ;
+    }
+
+    public PathExtractorContext(String metadataNamespace, String 
awsS3AtlasModelVersion) {
+        this(metadataNamespace, new HashMap<>(), false, 
awsS3AtlasModelVersion) ;
+    }
+
+    public PathExtractorContext(String metadataNamespace, boolean 
isConvertPathToLowerCase, String awsS3AtlasModelVersion) {
+        this(metadataNamespace, new HashMap<>(), isConvertPathToLowerCase, 
awsS3AtlasModelVersion) ;
+    }
+
+    public PathExtractorContext(String metadataNamespace, Map<String, 
AtlasEntity> knownEntities, boolean isConvertPathToLowerCase, String 
awsS3AtlasModelVersion) {
+        this.metadataNamespace        = metadataNamespace;
+        this.knownEntities            = knownEntities;
+        this.isConvertPathToLowerCase = isConvertPathToLowerCase;
+        this.awsS3AtlasModelVersion   = awsS3AtlasModelVersion;
+    }
+
+    public String getMetadataNamespace() {
+        return metadataNamespace;
+    }
+
+    public Map<String, AtlasEntity> getKnownEntities() {
+        return knownEntities;
+    }
+
+    public void putEntity(String qualifiedName, AtlasEntity entity) {
+        knownEntities.put(qualifiedName, entity);
+    }
+
+    public AtlasEntity getEntity(String qualifiedName) {
+        return knownEntities.get(qualifiedName);
+    }
+
+    public boolean isConvertPathToLowerCase() {
+        return isConvertPathToLowerCase;
+    }
+
+    public String getAwsS3AtlasModelVersion() {
+        return awsS3AtlasModelVersion;
+    }
+}
\ No newline at end of file
diff --git 
a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java 
b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
new file mode 100644
index 0000000..664bfb7
--- /dev/null
+++ 
b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.utils;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+import org.apache.hadoop.fs.Path;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+public class AtlasPathExtractorUtilTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasPathExtractorUtilTest.class);
+
+    // Common
+    private static final String METADATA_NAMESPACE          = "metaspace";
+    private static final String QNAME_METADATA_NAMESPACE    = '@' + 
METADATA_NAMESPACE;
+    private static final String SCHEME_SEPARATOR            = "://";
+    private static final String ATTRIBUTE_NAME              = "name";
+    private static final String ATTRIBUTE_QUALIFIED_NAME    = "qualifiedName";
+
+    // HDFS
+    private static final String HDFS_PATH_TYPE           = "hdfs_path";
+    private static final String ATTRIBUTE_PATH           = "path";
+    private static final String ATTRIBUTE_CLUSTER_NAME   = "clusterName";
+
+    // Ozone
+    private static final String OZONE_VOLUME      = "ozone_volume";
+    private static final String OZONE_BUCKET      = "ozone_bucket";
+    private static final String OZONE_KEY         = "ozone_key";
+    private static final String OZONE_SCHEME      = "ofs" + SCHEME_SEPARATOR;
+    private static final String OZONE_3_SCHEME    = "o3fs" + SCHEME_SEPARATOR;
+    private static final String OZONE_PATH        = OZONE_SCHEME + 
"bucket1.volume1.ozone1/files/file.txt";
+    private static final String OZONE_3_PATH      = OZONE_3_SCHEME + 
"bucket1.volume1.ozone1/files/file.txt";
+
+    // HDFS
+    private static final String HDFS_SCHEME    = "hdfs" + SCHEME_SEPARATOR;
+    private static final String HDFS_PATH      = HDFS_SCHEME + 
"host_name:8020/warehouse/tablespace/external/hive/taBlE_306";
+
+    @Test
+    public void testGetPathEntityOzone3Path() {
+        PathExtractorContext extractorContext = new 
PathExtractorContext(METADATA_NAMESPACE);
+
+        Path path = new Path(OZONE_3_PATH);
+        AtlasEntityWithExtInfo entityWithExtInfo = 
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+        AtlasEntity entity = entityWithExtInfo.getEntity();
+
+        assertNotNull(entity);
+        assertEquals(entity.getTypeName(), OZONE_KEY);
+        verifyOzoneKeyEntity(OZONE_3_PATH, entity);
+
+        assertEquals(entityWithExtInfo.getReferredEntities().size(), 2);
+        verifyOzoneEntities(OZONE_3_SCHEME, OZONE_3_PATH, 
extractorContext.getKnownEntities());
+
+        assertEquals(extractorContext.getKnownEntities().size(), 3);
+        verifyOzoneEntities(OZONE_3_SCHEME, OZONE_3_PATH, 
extractorContext.getKnownEntities());
+    }
+
+    @Test
+    public void testGetPathEntityOzonePath() {
+        PathExtractorContext extractorContext = new 
PathExtractorContext(METADATA_NAMESPACE);
+
+        Path path = new Path(OZONE_PATH);
+        AtlasEntityWithExtInfo entityWithExtInfo = 
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+        AtlasEntity entity = entityWithExtInfo.getEntity();
+
+        assertNotNull(entity);
+        assertEquals(entity.getTypeName(), OZONE_KEY);
+        verifyOzoneKeyEntity(OZONE_PATH, entity);
+
+        assertEquals(entityWithExtInfo.getReferredEntities().size(), 2);
+        verifyOzoneEntities(OZONE_SCHEME, OZONE_PATH, 
extractorContext.getKnownEntities());
+
+        assertEquals(extractorContext.getKnownEntities().size(), 3);
+        verifyOzoneEntities(OZONE_SCHEME, OZONE_PATH, 
extractorContext.getKnownEntities());
+    }
+
+    @Test
+    public void testGetPathEntityHdfsPath() {
+        Map<String, AtlasEntity> knownEntities = new HashMap<>();
+        AtlasEntityWithExtInfo extInfo = new AtlasEntityWithExtInfo();
+
+        PathExtractorContext extractorContext = new 
PathExtractorContext(METADATA_NAMESPACE);
+
+        Path path = new Path(HDFS_PATH);
+        AtlasEntityWithExtInfo entityWithExtInfo = 
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+        AtlasEntity entity = entityWithExtInfo.getEntity();
+
+        assertNotNull(entity);
+        assertEquals(entity.getTypeName(), HDFS_PATH_TYPE);
+        verifyHDFSEntity(entity, false);
+
+        assertNull(extInfo.getReferredEntities());
+        assertEquals(extractorContext.getKnownEntities().size(), 1);
+        extractorContext.getKnownEntities().values().forEach(x -> 
verifyHDFSEntity(x, false));
+    }
+
+    @Test
+    public void testGetPathEntityHdfsPathLowerCase() {
+        PathExtractorContext extractorContext = new 
PathExtractorContext(METADATA_NAMESPACE, true, null);
+
+        Path path = new Path(HDFS_PATH);
+        AtlasEntityWithExtInfo entityWithExtInfo = 
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+        AtlasEntity entity = entityWithExtInfo.getEntity();
+
+        assertNotNull(entity);
+        assertEquals(entity.getTypeName(), HDFS_PATH_TYPE);
+        verifyHDFSEntity(entity, true);
+
+        assertNull(entityWithExtInfo.getReferredEntities());
+        assertEquals(extractorContext.getKnownEntities().size(), 1);
+        extractorContext.getKnownEntities().values().forEach(x -> 
verifyHDFSEntity(x, true));
+    }
+
+    private void verifyOzoneEntities(String scheme, String path, Map<String, 
AtlasEntity> knownEntities) {
+        for (AtlasEntity knownEntity : knownEntities.values()) {
+            switch (knownEntity.getTypeName()){
+                case OZONE_KEY:
+                    verifyOzoneKeyEntity(path, knownEntity);
+                    break;
+
+                case OZONE_VOLUME:
+                    
assertEquals(knownEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), scheme + 
"volume1" + QNAME_METADATA_NAMESPACE);
+                    assertEquals(knownEntity.getAttribute(ATTRIBUTE_NAME), 
"volume1");
+                    break;
+
+                case OZONE_BUCKET:
+                    
assertEquals(knownEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), scheme + 
"volume1.bucket1" + QNAME_METADATA_NAMESPACE);
+                    assertEquals(knownEntity.getAttribute(ATTRIBUTE_NAME), 
"bucket1");
+                    break;
+            }
+        }
+    }
+
+    private void verifyOzoneKeyEntity(String path, AtlasEntity entity) {
+        assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), path + 
QNAME_METADATA_NAMESPACE);
+        assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "/files/file.txt");
+    }
+
+    private void verifyHDFSEntity(AtlasEntity entity, boolean toLowerCase) {
+        if (toLowerCase) {
+            assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), 
HDFS_PATH.toLowerCase() + QNAME_METADATA_NAMESPACE);
+            assertEquals(entity.getAttribute(ATTRIBUTE_NAME), 
"/warehouse/tablespace/external/hive/table_306");
+            assertEquals(entity.getAttribute(ATTRIBUTE_PATH), 
HDFS_PATH.toLowerCase());
+            assertEquals(entity.getAttribute(ATTRIBUTE_CLUSTER_NAME), 
METADATA_NAMESPACE);
+        } else {
+            assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), 
HDFS_PATH + QNAME_METADATA_NAMESPACE);
+            assertEquals(entity.getAttribute(ATTRIBUTE_NAME), 
"/warehouse/tablespace/external/hive/taBlE_306");
+            assertEquals(entity.getAttribute(ATTRIBUTE_PATH), HDFS_PATH);
+            assertEquals(entity.getAttribute(ATTRIBUTE_CLUSTER_NAME), 
METADATA_NAMESPACE);
+        }
+    }
+}

Reply via email to