ATLAS-2864: Improved incremental export queries. Signed-off-by: Ashutosh Mestry <ames...@hortonworks.com>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/558fe964 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/558fe964 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/558fe964 Branch: refs/heads/branch-1.0 Commit: 558fe96423780449ca91577430a12934b400bb75 Parents: bf24045 Author: Ashutosh Mestry <ames...@hortonworks.com> Authored: Wed Sep 12 21:51:09 2018 -0700 Committer: Ashutosh Mestry <ames...@hortonworks.com> Committed: Thu Nov 1 15:42:55 2018 -0700 ---------------------------------------------------------------------- .../atlas/model/impexp/AtlasExportRequest.java | 33 ++-- .../atlas/repository/impexp/ExportService.java | 186 +++++++++++-------- .../impexp/HdfsPathEntityCreator.java | 8 +- .../impexp/IncrementalExportEntityProvider.java | 111 +++++++++++ .../atlas/repository/util/UniqueList.java | 6 + .../IncrementalExportEntityProviderTest.java | 94 ++++++++++ .../stocksDB-Entities/export-incremental.json | 1 + 7 files changed, 350 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/558fe964/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java index 7bb8b76..e78bb53 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java @@ -80,18 +80,6 @@ public class AtlasExportRequest implements Serializable { this.options = options; } - public String getMatchTypeOptionValue() { - String matchType = null; - - if (MapUtils.isNotEmpty(getOptions())) { - if (getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) { - matchType = getOptions().get(OPTION_ATTR_MATCH_TYPE).toString(); - } - } - - return matchType; - } - public String getFetchTypeOptionValue() { if(getOptions() == null || !getOptions().containsKey(OPTION_FETCH_TYPE)) { return FETCH_TYPE_FULL; @@ -122,6 +110,27 @@ public class AtlasExportRequest implements Serializable { return false; } + public String getMatchTypeOptionValue() { + String matchType = null; + + if (MapUtils.isNotEmpty(getOptions())) { + if (getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) { + matchType = getOptions().get(OPTION_ATTR_MATCH_TYPE).toString(); + } + } + + return matchType; + } + + public long getChangeTokenFromOptions() { + if(getFetchTypeOptionValue().equalsIgnoreCase(FETCH_TYPE_INCREMENTAL) && + getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER)) { + return Long.parseLong(getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER).toString()); + } + + return 0L; + } + public StringBuilder toString(StringBuilder sb) { if (sb == null) { sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/atlas/blob/558fe964/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index f10d615..5e972a2 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -70,7 +70,7 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.*; public class ExportService { private static final Logger LOG = LoggerFactory.getLogger(ExportService.class); - private static final String PROPERTY_GUID = "__guid"; + public static final String PROPERTY_GUID = "__guid"; private static final String PROPERTY_IS_PROCESS = "isProcess"; @@ -82,6 +82,8 @@ public class ExportService { private final AtlasGremlinQueryProvider gremlinQueryProvider; private ExportTypeProcessor exportTypeProcessor; private final HdfsPathEntityCreator hdfsPathEntityCreator; + private IncrementalExportEntityProvider incrementalExportEntityProvider; + @Inject public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) { @@ -95,12 +97,12 @@ public class ExportService { public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName, String requestingIP) throws AtlasBaseException { - long startTime = System.currentTimeMillis(); - AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, + long startTime = System.currentTimeMillis(); + AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime, getCurrentChangeMarker()); - ExportContext context = new ExportContext(atlasGraph, result, exportSink); - exportTypeProcessor = new ExportTypeProcessor(typeRegistry, context); + ExportContext context = new ExportContext(atlasGraph, result, exportSink); + exportTypeProcessor = new ExportTypeProcessor(typeRegistry, context); try { LOG.info("==> export(user={}, from={})", userName, requestingIP); @@ -114,9 +116,11 @@ public class ExportService { LOG.error("Operation failed: ", ex); } finally { atlasGraph.releaseGremlinScriptEngine(context.scriptEngine); - LOG.info("<== export(user={}, from={}): status {}", userName, requestingIP, context.result.getOperationStatus()); + LOG.info("<== export(user={}, from={}): status {}: changeMarker: {}", + userName, requestingIP, context.result.getOperationStatus(), context.result.getChangeMarker()); context.clear(); result.clear(); + incrementalExportEntityProvider = null; } return context.result; @@ -177,7 +181,7 @@ public class ExportService { } } - private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context) throws AtlasServiceException, AtlasException, AtlasBaseException { + private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context) { AtlasExportResult.OperationStatus statuses[] = new AtlasExportResult.OperationStatus[request.getItemsToExport().size()]; List<AtlasObjectId> itemsToExport = request.getItemsToExport(); for (int i = 0; i < itemsToExport.size(); i++) { @@ -211,13 +215,14 @@ public class ExportService { } for (String guid : entityGuids) { - processEntity(guid, context); + processEntityGuid(guid, context); + populateEntitesForIncremental(guid, context); } while (!context.guidsToProcess.isEmpty()) { while (!context.guidsToProcess.isEmpty()) { String guid = context.guidsToProcess.remove(0); - processEntity(guid, context); + processEntityGuid(guid, context); } if (!context.lineageToProcess.isEmpty()) { @@ -253,55 +258,61 @@ public class ExportService { if (StringUtils.isNotEmpty(item.getGuid())) { ret = Collections.singletonList(item.getGuid()); } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_FOR_TYPE) && StringUtils.isNotEmpty(item.getTypeName())) { - final String queryTemplate = getQueryTemplateForMatchType(context); + ret = getStartingEntityForMatchTypeForType(item, context); + } else if (StringUtils.isNotEmpty(item.getTypeName()) && MapUtils.isNotEmpty(item.getUniqueAttributes())) { + ret = getStartingEntityUsingQueryTemplate(item, context, ret); + } - setupBindingsForTypeName(context, item.getTypeName()); + if (ret == null) { + ret = Collections.emptyList(); + } - ret = executeGremlinQueryForGuids(queryTemplate, context); - } else if (StringUtils.isNotEmpty(item.getTypeName()) && MapUtils.isNotEmpty(item.getUniqueAttributes())) { - final String queryTemplate = getQueryTemplateForMatchType(context); - final String typeName = item.getTypeName(); - final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + logInfoStartingEntitiesFound(item, context, ret); + return ret; + } - if (entityType == null) { - throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName); - } + private List<String> getStartingEntityUsingQueryTemplate(AtlasObjectId item, ExportContext context, List<String> ret) throws AtlasBaseException { + final String queryTemplate = getQueryTemplateForMatchType(context); + final String typeName = item.getTypeName(); + final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); - for (Map.Entry<String, Object> e : item.getUniqueAttributes().entrySet()) { - String attrName = e.getKey(); - Object attrValue = e.getValue(); + if (entityType == null) { + throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName); + } - AtlasAttribute attribute = entityType.getAttribute(attrName); - if (attribute == null || attrValue == null) { - continue; - } + for (Map.Entry<String, Object> e : item.getUniqueAttributes().entrySet()) { + String attrName = e.getKey(); + Object attrValue = e.getValue(); - setupBindingsForTypeNameAttrNameAttrValue(context, typeName, attrValue, attribute); + AtlasAttribute attribute = entityType.getAttribute(attrName); + if (attribute == null || attrValue == null) { + continue; + } - List<String> guids = executeGremlinQueryForGuids(queryTemplate, context); + setupBindingsForTypeNameAttrNameAttrValue(context, typeName, attrValue, attribute); - if (CollectionUtils.isNotEmpty(guids)) { - if (ret == null) { - ret = new ArrayList<>(); - } + List<String> guids = executeGremlinQueryForGuids(queryTemplate, context); - for (String guid : guids) { - if (!ret.contains(guid)) { - ret.add(guid); - } + if (CollectionUtils.isNotEmpty(guids)) { + if (ret == null) { + ret = new ArrayList<>(); + } + + for (String guid : guids) { + if (!ret.contains(guid)) { + ret.add(guid); } } } } - - if (ret == null) { - ret = Collections.emptyList(); - } - - logInfoStartingEntitiesFound(item, context, ret); return ret; } + private List<String> getStartingEntityForMatchTypeForType(AtlasObjectId item, ExportContext context) { + setupBindingsForTypeName(context, item.getTypeName()); + return executeGremlinQueryForGuids(getQueryTemplateForMatchType(context), context); + } + private void logInfoStartingEntitiesFound(AtlasObjectId item, ExportContext context, List<String> ret) { LOG.info("export(item={}; matchType={}, fetchType={}): found {} entities: options: {}", item, context.matchType, context.fetchType, ret.size(), AtlasType.toJson(context.result.getRequest())); @@ -344,34 +355,43 @@ public class ExportService { return gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_DEFAULT); } - private void processEntity(String guid, ExportContext context) throws AtlasBaseException { - debugLog("==> processEntity({})", guid); + private void processEntityGuid(String guid, ExportContext context) throws AtlasBaseException { + debugLog("==> processEntityGuid({})", guid); - if (!context.guidsProcessed.contains(guid)) { - TraversalDirection direction = context.guidDirection.get(guid); - AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); + if (context.guidsProcessed.contains(guid)) { + return; + } + + TraversalDirection direction = context.guidDirection.get(guid); + AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); + + processEntity(guid, entityWithExtInfo, context, direction); + + debugLog("<== processEntityGuid({})", guid); + } + + public void processEntity(String guid, AtlasEntityWithExtInfo entityWithExtInfo, + ExportContext context, + TraversalDirection direction) throws AtlasBaseException { if (!context.lineageProcessed.contains(guid) && context.doesTimestampQualify(entityWithExtInfo.getEntity())) { context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid()); } - addEntity(entityWithExtInfo, context); - exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context); + addEntity(entityWithExtInfo, context); + exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context); - context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid()); - getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction); + context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid()); + getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction); - if(entityWithExtInfo.getReferredEntities() != null) { - for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { - exportTypeProcessor.addTypes(e, context); - getConntedEntitiesBasedOnOption(e, context, direction); - } - - context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet()); + if (entityWithExtInfo.getReferredEntities() != null) { + for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { + exportTypeProcessor.addTypes(e, context); + getConntedEntitiesBasedOnOption(e, context, direction); } - } - debugLog("<== processEntity({})", guid); + context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet()); + } } private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) { @@ -381,12 +401,25 @@ public class ExportService { break; case INCREMENTAL: + if(context.isHiveDBIncrementalSkipLineage()) { + break; + } + case FULL: default: getEntityGuidsForFullFetch(entity, context); } } + private void populateEntitesForIncremental(String topLevelEntityGuid, ExportContext context) throws AtlasBaseException { + if (context.isHiveDBIncrementalSkipLineage() == false || incrementalExportEntityProvider != null) { + return; + } + + incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, context.scriptEngine); + incrementalExportEntityProvider.populate(topLevelEntityGuid, context.changeMarker, context.guidsToProcess); + } + private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) { if (direction == null || direction == TraversalDirection.UNKNOWN) { getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD); @@ -651,7 +684,7 @@ public class ExportService { } } - private enum TraversalDirection { + public enum TraversalDirection { UNKNOWN, INWARD, OUTWARD, @@ -682,9 +715,11 @@ public class ExportService { static class ExportContext { private static final int REPORTING_THREASHOLD = 1000; + private static final String ATLAS_TYPE_HIVE_DB = "hive_db"; + final Set<String> guidsProcessed = new HashSet<>(); - final UniqueList<String> guidsToProcess = new UniqueList<>(); + final private UniqueList<String> guidsToProcess = new UniqueList<>(); final UniqueList<String> lineageToProcess = new UniqueList<>(); final Set<String> lineageProcessed = new HashSet<>(); final Map<String, TraversalDirection> guidDirection = new HashMap<>(); @@ -700,7 +735,8 @@ public class ExportService { private final ExportFetchType fetchType; private final String matchType; private final boolean skipLineage; - private final long changeMarker; + private final long changeMarker; + private final boolean isHiveDBIncremental; private int progressReportCount = 0; @@ -713,16 +749,18 @@ public class ExportService { fetchType = ExportFetchType.from(result.getRequest().getFetchTypeOptionValue()); matchType = result.getRequest().getMatchTypeOptionValue(); skipLineage = result.getRequest().getSkipLineageOptionValue(); - this.changeMarker = getChangeTokenFromOptions(fetchType, result.getRequest()); + this.changeMarker = result.getRequest().getChangeTokenFromOptions(); + this.isHiveDBIncremental = checkHiveDBIncrementalSkipLineage(result.getRequest()); } - private long getChangeTokenFromOptions(ExportFetchType fetchType, AtlasExportRequest request) { - if(fetchType == ExportFetchType.INCREMENTAL && - request.getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER)) { - return Long.parseLong(request.getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER).toString()); + private boolean checkHiveDBIncrementalSkipLineage(AtlasExportRequest request) { + if(request.getItemsToExport().size() == 0) { + return false; } - return 0L; + return request.getItemsToExport().get(0).getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_DB) && + request.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL) && + request.getSkipLineageOptionValue(); } public List<AtlasEntity> getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) { @@ -752,12 +790,10 @@ public class ExportService { } public void addToBeProcessed(boolean isSuperTypeProcess, String guid, TraversalDirection direction) { - if(!isSuperTypeProcess) { - guidsToProcess.add(guid); - } - if(isSuperTypeProcess) { lineageToProcess.add(guid); + } else { + guidsToProcess.add(guid); } guidDirection.put(guid, direction); @@ -786,5 +822,9 @@ public class ExportService { public void addToSink(AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException { sink.add(entityWithExtInfo); } + + public boolean isHiveDBIncrementalSkipLineage() { + return isHiveDBIncremental; + } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/558fe964/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java b/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java index fddd60b..4a09c0f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java @@ -22,8 +22,8 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1; -import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.slf4j.Logger; @@ -50,10 +50,10 @@ public class HdfsPathEntityCreator { private final String PATH_SEPARATOR = "/"; private AtlasTypeRegistry typeRegistry; - private AtlasEntityStoreV1 entityStore; + private AtlasEntityStoreV2 entityStore; @Inject - public HdfsPathEntityCreator(AtlasTypeRegistry typeRegistry, AtlasEntityStoreV1 entityStore) { + public HdfsPathEntityCreator(AtlasTypeRegistry typeRegistry, AtlasEntityStoreV2 entityStore) { this.typeRegistry = typeRegistry; this.entityStore = entityStore; } http://git-wip-us.apache.org/repos/asf/atlas/blob/558fe964/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java new file mode 100644 index 0000000..3a2a917 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.util.UniqueList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.script.ScriptEngine; +import javax.script.ScriptException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class IncrementalExportEntityProvider { + private static final Logger LOG = LoggerFactory.getLogger(IncrementalExportEntityProvider.class); + + private static final String QUERY_PARAMETER_START_GUID = "startGuid"; + private static final String QUERY_PARAMETER_MODIFICATION_TIMESTAMP = "modificationTimestamp"; + + private AtlasGraph atlasGraph; + + private static final String QUERY_DB = "g.V().has('__guid', startGuid)"; + private static final String QUERY_TABLE = QUERY_DB + ".in('__hive_table.db')"; + private static final String QUERY_SD = QUERY_TABLE + ".out('__hive_table.sd')"; + private static final String QUERY_COLUMN = QUERY_TABLE + ".out('__hive_table.columns')"; + private static final String TRANSFORM_CLAUSE = ".project('__guid').by('__guid').dedup().toList()"; + private static final String TIMESTAMP_CLAUSE = ".has('__modificationTimestamp', gt(modificationTimestamp))"; + + private ScriptEngine scriptEngine; + + @Inject + public IncrementalExportEntityProvider(AtlasGraph atlasGraph, ScriptEngine scriptEngine) { + this.atlasGraph = atlasGraph; + this.scriptEngine = scriptEngine; + } + + public void populate(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) { + if(timeStamp == 0L) { + full(dbEntityGuid, guidsToProcess); + } else { + partial(dbEntityGuid, timeStamp, guidsToProcess); + } + } + + private void partial(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) { + guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_TABLE, timeStamp)); + guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_SD, timeStamp)); + guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_COLUMN, timeStamp)); + } + + private void full(String dbEntityGuid, UniqueList<String> guidsToProcess) { + guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_TABLE, 0L)); + } + + private List<String> fetchGuids(final String dbEntityGuid, String query, long timeStamp) { + Map<String, Object> bindings = new HashMap<String, Object>() {{ + put(QUERY_PARAMETER_START_GUID, dbEntityGuid); + }}; + + String queryWithClause = query; + if(timeStamp > 0L) { + bindings.put(QUERY_PARAMETER_MODIFICATION_TIMESTAMP, timeStamp); + queryWithClause = queryWithClause.concat(TIMESTAMP_CLAUSE); + } + + return executeGremlinQuery(queryWithClause, bindings); + } + + private List<String> executeGremlinQuery(String query, Map<String, Object> bindings) { + try { + List<String> guids = new ArrayList<>(); + String queryWithTransform = query + TRANSFORM_CLAUSE; + List<Map<String, Object>> result = (List<Map<String, Object>>) + atlasGraph.executeGremlinScript(scriptEngine, bindings, queryWithTransform, false); + if (result == null) { + return guids; + } + + for (Map<String, Object> item : result) { + guids.add((String) item.get(ExportService.PROPERTY_GUID)); + } + + return guids; + + } catch (ScriptException e) { + LOG.error("error executing query: {}: bindings: {}", query, bindings, e); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/558fe964/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java b/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java index 9148ce0..eebbc4e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java +++ b/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java @@ -44,6 +44,12 @@ public class UniqueList<T> { } } + public void addAll(List<T> list) { + for (T item : list) { + add(item); + } + } + public T remove(int index) { T e = list.remove(index); set.remove(e); http://git-wip-us.apache.org/repos/asf/atlas/blob/558fe964/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java new file mode 100644 index 0000000..de0a8f8 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.TestModules; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.util.UniqueList; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; +import javax.script.ScriptEngine; +import java.io.IOException; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +@Guice(modules = TestModules.TestOnlyModule.class) +public class IncrementalExportEntityProviderTest extends ExportImportTestBase { + @Inject + AtlasTypeRegistry typeRegistry; + + @Inject + private AtlasTypeDefStore typeDefStore; + + @Inject + private AtlasEntityStoreV2 entityStore; + + @Inject + private AtlasGraph atlasGraph; + + private IncrementalExportEntityProvider incrementalExportEntityProvider; + private ScriptEngine gremlinScriptEngine; + + @BeforeClass + public void setup() throws IOException, AtlasBaseException { + basicSetup(typeDefStore, typeRegistry); + createEntities(entityStore, ENTITIES_SUB_DIR, new String[] { "db", "table-columns"}); + final String[] entityGuids = {DB_GUID, TABLE_GUID}; + verifyCreatedEntities(entityStore, entityGuids, 2); + + gremlinScriptEngine = atlasGraph.getGremlinScriptEngine(); + EntityGraphRetriever entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry); + incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, gremlinScriptEngine); + } + + @AfterClass + public void tearDown() { + if(gremlinScriptEngine != null) { + atlasGraph.releaseGremlinScriptEngine(gremlinScriptEngine); + } + } + + @Test + public void verify() { + executeQueries(0L, 1); + executeQueries(1L, 9); + } + + private void executeQueries(long timeStamp, int expectedEntityCount) { + UniqueList<String> uniqueList = new UniqueList<>(); + incrementalExportEntityProvider.populate(DB_GUID, timeStamp, uniqueList); + + for (String g : uniqueList.getList()) { + assertTrue(g instanceof String); + } + + assertEquals(uniqueList.size(), expectedEntityCount); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/558fe964/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json index fdd3b01..ba125e7 100644 --- a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json +++ b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json @@ -6,6 +6,7 @@ ], "options": { "fetchType": "incremental", + "skipLineage": "true", "changeMarker": 0 } }