ATLAS-2869: Hdfs_path if requested are created and then proceeds with export.
Signed-off-by: Ashutosh Mestry <ames...@hortonworks.com> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/b888ade5 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/b888ade5 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/b888ade5 Branch: refs/heads/branch-0.8 Commit: b888ade513ce6b72d9361b5969c8cf55262e5e84 Parents: 56b36f6 Author: Ashutosh Mestry <ames...@hortonworks.com> Authored: Tue Sep 11 15:29:02 2018 -0700 Committer: Ashutosh Mestry <ames...@hortonworks.com> Committed: Tue Sep 11 22:26:04 2018 -0700 ---------------------------------------------------------------------- .../atlas/repository/impexp/ExportService.java | 12 +- .../impexp/HdfsPathEntityCreator.java | 131 +++++++++++++++++++ .../impexp/HdfsPathEntityCreatorTest.java | 81 ++++++++++++ 3 files changed, 221 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/b888ade5/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index aded67c..612549d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -74,14 +74,16 @@ public class ExportService { private final EntityGraphRetriever entityGraphRetriever; private final AtlasGremlinQueryProvider gremlinQueryProvider; private ExportTypeProcessor exportTypeProcessor; - + private final HdfsPathEntityCreator hdfsPathEntityCreator; @Inject - public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AuditsWriter auditsWriter) { + public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, + AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) { this.typeRegistry = typeRegistry; this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry); this.atlasGraph = atlasGraph; this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE; - this.auditsWriter = auditsWriter; + this.auditsWriter = auditsWriter; + this.hdfsPathEntityCreator = hdfsPathEntityCreator; } public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName, @@ -237,6 +239,10 @@ public class ExportService { private List<String> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException { List<String> ret = null; + if(item.getTypeName().equalsIgnoreCase(HdfsPathEntityCreator.HDFS_PATH_TYPE)) { + hdfsPathEntityCreator.getCreateEntity(item); + } + if (StringUtils.isNotEmpty(item.getGuid())) { ret = Collections.singletonList(item.getGuid()); } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_FOR_TYPE) && StringUtils.isNotEmpty(item.getTypeName())) { http://git-wip-us.apache.org/repos/asf/atlas/blob/b888ade5/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java b/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java new file mode 100644 index 0000000..fddd60b --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1; +import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.atlas.repository.impexp.AuditsWriter.getCurrentClusterName; + +@Component +public class HdfsPathEntityCreator { + protected static final Logger LOG = LoggerFactory.getLogger(HdfsPathEntityCreator.class); + + public static final String HDFS_PATH_TYPE = "hdfs_path"; + public static final String HDFS_PATH_ATTRIBUTE_NAME_NAME = "name"; + public static final String HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME = "clusterName"; + public static final String HDFS_PATH_ATTRIBUTE_NAME_PATH = "path"; + public static final String HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + + private static final String QUALIFIED_NAME_FORMAT = "%s@%s"; + private final String PATH_SEPARATOR = "/"; + + private AtlasTypeRegistry typeRegistry; + private AtlasEntityStoreV1 entityStore; + + @Inject + public HdfsPathEntityCreator(AtlasTypeRegistry typeRegistry, AtlasEntityStoreV1 entityStore) { + this.typeRegistry = typeRegistry; + this.entityStore = entityStore; + } + + public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(AtlasObjectId item) throws AtlasBaseException { + if(!item.getUniqueAttributes().containsKey(HDFS_PATH_ATTRIBUTE_NAME_PATH)) { + return null; + } + + return getCreateEntity((String) item.getUniqueAttributes().get(HDFS_PATH_ATTRIBUTE_NAME_PATH)); + } + + public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(String path) throws AtlasBaseException { + return getCreateEntity(path, getCurrentClusterName()); + } + + public AtlasEntity.AtlasEntityWithExtInfo getCreateEntity(String path, String clusterName) throws AtlasBaseException { + String pathWithTrailingSeparator = getPathWithTrailingSeparator(path); + AtlasEntityType hdfsPathEntityType = getHdfsPathEntityType(); + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getHDFSPathEntity(hdfsPathEntityType, pathWithTrailingSeparator, clusterName); + if(entityWithExtInfo != null) { + return entityWithExtInfo; + } + + AtlasEntity entity = createHDFSPathEntity(hdfsPathEntityType, pathWithTrailingSeparator, clusterName); + AtlasEntityStream entityStream = new AtlasEntityStream(entity); + EntityMutationResponse entityMutationResponse = entityStore.createOrUpdate(entityStream, false); + if(entityMutationResponse.getCreatedEntities().size() == 0) { + return null; + } + + return getHDFSPathEntity(hdfsPathEntityType, pathWithTrailingSeparator, clusterName); + } + + private AtlasEntity createHDFSPathEntity(AtlasEntityType hdfsPathEntityType, String path, String clusterName) { + AtlasEntity entity = hdfsPathEntityType.createDefaultValue(); + + entity.setAttribute(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(path, clusterName)); + entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_PATH, path); + entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_NAME, path); + entity.setAttribute(HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME, clusterName); + + return entity; + } + + private AtlasEntity.AtlasEntityWithExtInfo getHDFSPathEntity(AtlasEntityType hdfsPathEntityType, String path, String clusterName) { + try { + return entityStore.getByUniqueAttributes(hdfsPathEntityType, getUniqueAttributes(path, clusterName)); + } catch (AtlasBaseException e) { + return null; + } + } + + private AtlasEntityType getHdfsPathEntityType() throws AtlasBaseException { + return (AtlasEntityType) typeRegistry.getType(HDFS_PATH_TYPE); + } + + private Map<String,Object> getUniqueAttributes(String path, String clusterName) { + Map<String,Object> ret = new HashMap<String, Object>(); + ret.put(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(path, clusterName)); + return ret; + } + + private String getPathWithTrailingSeparator(String path) { + if(path.endsWith(PATH_SEPARATOR)) { + return path; + } + + return path + PATH_SEPARATOR; + } + + public static String getQualifiedName(String path, String clusterName) { + return String.format(QUALIFIED_NAME_FORMAT, path, clusterName); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/b888ade5/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java new file mode 100644 index 0000000..1863b8d --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreatorTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.TestModules; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; +import java.io.IOException; + +import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME; +import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_NAME; +import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME; +import static org.apache.atlas.repository.impexp.HdfsPathEntityCreator.getQualifiedName; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadFsModel; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +@Guice(modules = TestModules.TestOnlyModule.class) +public class HdfsPathEntityCreatorTest extends ExportImportTestBase { + + @Inject + AtlasTypeRegistry typeRegistry; + + @Inject + private AtlasTypeDefStore typeDefStore; + + @Inject + private HdfsPathEntityCreator hdfsPathEntityCreator; + + private static final String expectedPath = "hdfs://server-name/warehouse/hr"; + private static final String expectedClusterName = "cl1"; + + @BeforeClass + public void setup() throws IOException, AtlasBaseException { + basicSetup(typeDefStore, typeRegistry); + loadFsModel(typeDefStore, typeRegistry); + } + + @Test + public void verifyCreate() throws AtlasBaseException { + + String expectedQualifiedName = getQualifiedName(expectedPath + "/", expectedClusterName); + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = hdfsPathEntityCreator.getCreateEntity(expectedPath, expectedClusterName); + + assertNotNull(entityWithExtInfo); + AtlasEntity entity = entityWithExtInfo.getEntity(); + assertEquals(entity.getAttribute(HdfsPathEntityCreator.HDFS_PATH_ATTRIBUTE_NAME_PATH), expectedPath + "/"); + assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_QUALIFIED_NAME),expectedQualifiedName); + assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_NAME_NAME), expectedPath + "/"); + assertEquals(entity.getAttribute(HDFS_PATH_ATTRIBUTE_NAME_CLUSTER_NAME), expectedClusterName); + } + + @Test(dependsOnMethods = "verifyCreate") + public void verifyGet() throws AtlasBaseException { + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = hdfsPathEntityCreator.getCreateEntity(expectedPath, expectedClusterName); + + assertNotNull(entityWithExtInfo); + } +}