ATLAS-2864: Improved incremental export queries.

Signed-off-by: Ashutosh Mestry <ames...@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/558fe964
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/558fe964
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/558fe964

Branch: refs/heads/branch-1.0
Commit: 558fe96423780449ca91577430a12934b400bb75
Parents: bf24045
Author: Ashutosh Mestry <ames...@hortonworks.com>
Authored: Wed Sep 12 21:51:09 2018 -0700
Committer: Ashutosh Mestry <ames...@hortonworks.com>
Committed: Thu Nov 1 15:42:55 2018 -0700

----------------------------------------------------------------------
 .../atlas/model/impexp/AtlasExportRequest.java  |  33 ++--
 .../atlas/repository/impexp/ExportService.java  | 186 +++++++++++--------
 .../impexp/HdfsPathEntityCreator.java           |   8 +-
 .../impexp/IncrementalExportEntityProvider.java | 111 +++++++++++
 .../atlas/repository/util/UniqueList.java       |   6 +
 .../IncrementalExportEntityProviderTest.java    |  94 ++++++++++
 .../stocksDB-Entities/export-incremental.json   |   1 +
 7 files changed, 350 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/558fe964/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
----------------------------------------------------------------------
diff --git 
a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java 
b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
index 7bb8b76..e78bb53 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
@@ -80,18 +80,6 @@ public class AtlasExportRequest implements Serializable {
         this.options = options;
     }
 
-    public String getMatchTypeOptionValue() {
-        String matchType = null;
-
-        if (MapUtils.isNotEmpty(getOptions())) {
-            if (getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
-                matchType = 
getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
-            }
-        }
-
-        return matchType;
-    }
-
     public String getFetchTypeOptionValue() {
         if(getOptions() == null || 
!getOptions().containsKey(OPTION_FETCH_TYPE)) {
             return FETCH_TYPE_FULL;
@@ -122,6 +110,27 @@ public class AtlasExportRequest implements Serializable {
         return false;
     }
 
+    public String getMatchTypeOptionValue() {
+        String matchType = null;
+
+        if (MapUtils.isNotEmpty(getOptions())) {
+            if (getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
+                matchType = 
getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
+            }
+        }
+
+        return matchType;
+    }
+
+    public long getChangeTokenFromOptions() {
+        if(getFetchTypeOptionValue().equalsIgnoreCase(FETCH_TYPE_INCREMENTAL) 
&&
+                
getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER))
 {
+            return 
Long.parseLong(getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER).toString());
+        }
+
+        return 0L;
+    }
+
     public StringBuilder toString(StringBuilder sb) {
         if (sb == null) {
             sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/atlas/blob/558fe964/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index f10d615..5e972a2 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -70,7 +70,7 @@ import static 
org.apache.atlas.model.impexp.AtlasExportRequest.*;
 public class ExportService {
     private static final Logger LOG = 
LoggerFactory.getLogger(ExportService.class);
 
-    private static final String PROPERTY_GUID = "__guid";
+    public static final String PROPERTY_GUID = "__guid";
     private static final String PROPERTY_IS_PROCESS = "isProcess";
 
 
@@ -82,6 +82,8 @@ public class ExportService {
     private final AtlasGremlinQueryProvider gremlinQueryProvider;
     private       ExportTypeProcessor       exportTypeProcessor;
     private final HdfsPathEntityCreator     hdfsPathEntityCreator;
+    private       IncrementalExportEntityProvider 
incrementalExportEntityProvider;
+
     @Inject
     public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph 
atlasGraph,
                          AuditsWriter auditsWriter, HdfsPathEntityCreator 
hdfsPathEntityCreator) {
@@ -95,12 +97,12 @@ public class ExportService {
 
     public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest 
request, String userName, String hostName,
                                  String requestingIP) throws 
AtlasBaseException {
-        long              startTime = System.currentTimeMillis();
-        AtlasExportResult result    = new AtlasExportResult(request, userName, 
requestingIP,
+        long startTime = System.currentTimeMillis();
+        AtlasExportResult result = new AtlasExportResult(request, userName, 
requestingIP,
                 hostName, startTime, getCurrentChangeMarker());
 
-        ExportContext     context   = new ExportContext(atlasGraph, result, 
exportSink);
-                    exportTypeProcessor = new 
ExportTypeProcessor(typeRegistry, context);
+        ExportContext context = new ExportContext(atlasGraph, result, 
exportSink);
+        exportTypeProcessor = new ExportTypeProcessor(typeRegistry, context);
 
         try {
             LOG.info("==> export(user={}, from={})", userName, requestingIP);
@@ -114,9 +116,11 @@ public class ExportService {
             LOG.error("Operation failed: ", ex);
         } finally {
             atlasGraph.releaseGremlinScriptEngine(context.scriptEngine);
-            LOG.info("<== export(user={}, from={}): status {}", userName, 
requestingIP, context.result.getOperationStatus());
+            LOG.info("<== export(user={}, from={}): status {}: changeMarker: 
{}",
+                    userName, requestingIP, 
context.result.getOperationStatus(), context.result.getChangeMarker());
             context.clear();
             result.clear();
+            incrementalExportEntityProvider = null;
         }
 
         return context.result;
@@ -177,7 +181,7 @@ public class ExportService {
         }
     }
 
-    private AtlasExportResult.OperationStatus[] 
processItems(AtlasExportRequest request, ExportContext context) throws 
AtlasServiceException, AtlasException, AtlasBaseException {
+    private AtlasExportResult.OperationStatus[] 
processItems(AtlasExportRequest request, ExportContext context) {
         AtlasExportResult.OperationStatus statuses[] = new 
AtlasExportResult.OperationStatus[request.getItemsToExport().size()];
         List<AtlasObjectId> itemsToExport = request.getItemsToExport();
         for (int i = 0; i < itemsToExport.size(); i++) {
@@ -211,13 +215,14 @@ public class ExportService {
             }
 
             for (String guid : entityGuids) {
-                processEntity(guid, context);
+                processEntityGuid(guid, context);
+                populateEntitesForIncremental(guid, context);
             }
 
             while (!context.guidsToProcess.isEmpty()) {
                 while (!context.guidsToProcess.isEmpty()) {
                     String guid = context.guidsToProcess.remove(0);
-                    processEntity(guid, context);
+                    processEntityGuid(guid, context);
                 }
 
                 if (!context.lineageToProcess.isEmpty()) {
@@ -253,55 +258,61 @@ public class ExportService {
         if (StringUtils.isNotEmpty(item.getGuid())) {
             ret = Collections.singletonList(item.getGuid());
         } else if (StringUtils.equalsIgnoreCase(context.matchType, 
MATCH_TYPE_FOR_TYPE) && StringUtils.isNotEmpty(item.getTypeName())) {
-            final String queryTemplate = getQueryTemplateForMatchType(context);
+            ret = getStartingEntityForMatchTypeForType(item, context);
+        } else if (StringUtils.isNotEmpty(item.getTypeName()) && 
MapUtils.isNotEmpty(item.getUniqueAttributes())) {
+            ret = getStartingEntityUsingQueryTemplate(item, context, ret);
+        }
 
-            setupBindingsForTypeName(context, item.getTypeName());
+        if (ret == null) {
+            ret = Collections.emptyList();
+        }
 
-            ret = executeGremlinQueryForGuids(queryTemplate, context);
-        } else if (StringUtils.isNotEmpty(item.getTypeName()) && 
MapUtils.isNotEmpty(item.getUniqueAttributes())) {
-            final String          queryTemplate = 
getQueryTemplateForMatchType(context);
-            final String          typeName      = item.getTypeName();
-            final AtlasEntityType entityType    = 
typeRegistry.getEntityTypeByName(typeName);
+        logInfoStartingEntitiesFound(item, context, ret);
+        return ret;
+    }
 
-            if (entityType == null) {
-                throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, 
typeName);
-            }
+    private List<String> getStartingEntityUsingQueryTemplate(AtlasObjectId 
item, ExportContext context, List<String> ret) throws AtlasBaseException {
+        final String          queryTemplate = 
getQueryTemplateForMatchType(context);
+        final String          typeName      = item.getTypeName();
+        final AtlasEntityType entityType    = 
typeRegistry.getEntityTypeByName(typeName);
 
-            for (Map.Entry<String, Object> e : 
item.getUniqueAttributes().entrySet()) {
-                String attrName  = e.getKey();
-                Object attrValue = e.getValue();
+        if (entityType == null) {
+            throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, 
typeName);
+        }
 
-                AtlasAttribute attribute = entityType.getAttribute(attrName);
-                if (attribute == null || attrValue == null) {
-                    continue;
-                }
+        for (Map.Entry<String, Object> e : 
item.getUniqueAttributes().entrySet()) {
+            String attrName  = e.getKey();
+            Object attrValue = e.getValue();
 
-                setupBindingsForTypeNameAttrNameAttrValue(context, typeName, 
attrValue, attribute);
+            AtlasAttribute attribute = entityType.getAttribute(attrName);
+            if (attribute == null || attrValue == null) {
+                continue;
+            }
 
-                List<String> guids = 
executeGremlinQueryForGuids(queryTemplate, context);
+            setupBindingsForTypeNameAttrNameAttrValue(context, typeName, 
attrValue, attribute);
 
-                if (CollectionUtils.isNotEmpty(guids)) {
-                    if (ret == null) {
-                        ret = new ArrayList<>();
-                    }
+            List<String> guids = executeGremlinQueryForGuids(queryTemplate, 
context);
 
-                    for (String guid : guids) {
-                        if (!ret.contains(guid)) {
-                            ret.add(guid);
-                        }
+            if (CollectionUtils.isNotEmpty(guids)) {
+                if (ret == null) {
+                    ret = new ArrayList<>();
+                }
+
+                for (String guid : guids) {
+                    if (!ret.contains(guid)) {
+                        ret.add(guid);
                     }
                 }
             }
         }
-
-        if (ret == null) {
-            ret = Collections.emptyList();
-        }
-
-        logInfoStartingEntitiesFound(item, context, ret);
         return ret;
     }
 
+    private List<String> getStartingEntityForMatchTypeForType(AtlasObjectId 
item, ExportContext context) {
+        setupBindingsForTypeName(context, item.getTypeName());
+        return 
executeGremlinQueryForGuids(getQueryTemplateForMatchType(context), context);
+    }
+
     private void logInfoStartingEntitiesFound(AtlasObjectId item, 
ExportContext context, List<String> ret) {
         LOG.info("export(item={}; matchType={}, fetchType={}): found {} 
entities: options: {}", item,
                 context.matchType, context.fetchType, ret.size(), 
AtlasType.toJson(context.result.getRequest()));
@@ -344,34 +355,43 @@ public class ExportService {
         return 
gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_DEFAULT);
     }
 
-    private void processEntity(String guid, ExportContext context) throws 
AtlasBaseException {
-        debugLog("==> processEntity({})", guid);
+    private void processEntityGuid(String guid, ExportContext context) throws 
AtlasBaseException {
+        debugLog("==> processEntityGuid({})", guid);
 
-        if (!context.guidsProcessed.contains(guid)) {
-            TraversalDirection      direction         = 
context.guidDirection.get(guid);
-            AtlasEntityWithExtInfo  entityWithExtInfo = 
entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
+        if (context.guidsProcessed.contains(guid)) {
+            return;
+        }
+
+        TraversalDirection direction = context.guidDirection.get(guid);
+        AtlasEntityWithExtInfo entityWithExtInfo = 
entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
+
+        processEntity(guid, entityWithExtInfo, context, direction);
+
+        debugLog("<== processEntityGuid({})", guid);
+    }
+
+    public void processEntity(String guid, AtlasEntityWithExtInfo 
entityWithExtInfo,
+                               ExportContext context,
+                               TraversalDirection direction) throws 
AtlasBaseException {
 
         if (!context.lineageProcessed.contains(guid) && 
context.doesTimestampQualify(entityWithExtInfo.getEntity())) {
             
context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
         }
 
-            addEntity(entityWithExtInfo, context);
-            exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), 
context);
+        addEntity(entityWithExtInfo, context);
+        exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
 
-            
context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
-            getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), 
context, direction);
+        context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
+        getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), 
context, direction);
 
-            if(entityWithExtInfo.getReferredEntities() != null) {
-                for (AtlasEntity e : 
entityWithExtInfo.getReferredEntities().values()) {
-                    exportTypeProcessor.addTypes(e, context);
-                    getConntedEntitiesBasedOnOption(e, context, direction);
-                }
-
-                
context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet());
+        if (entityWithExtInfo.getReferredEntities() != null) {
+            for (AtlasEntity e : 
entityWithExtInfo.getReferredEntities().values()) {
+                exportTypeProcessor.addTypes(e, context);
+                getConntedEntitiesBasedOnOption(e, context, direction);
             }
-        }
 
-        debugLog("<== processEntity({})", guid);
+            
context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet());
+        }
     }
 
     private void getConntedEntitiesBasedOnOption(AtlasEntity entity, 
ExportContext context, TraversalDirection direction) {
@@ -381,12 +401,25 @@ public class ExportService {
                 break;
 
             case INCREMENTAL:
+                if(context.isHiveDBIncrementalSkipLineage()) {
+                    break;
+                }
+
             case FULL:
             default:
                 getEntityGuidsForFullFetch(entity, context);
         }
     }
 
+    private void populateEntitesForIncremental(String topLevelEntityGuid, 
ExportContext context) throws AtlasBaseException {
+        if (context.isHiveDBIncrementalSkipLineage() == false || 
incrementalExportEntityProvider != null) {
+            return;
+        }
+
+        incrementalExportEntityProvider = new 
IncrementalExportEntityProvider(atlasGraph, context.scriptEngine);
+        incrementalExportEntityProvider.populate(topLevelEntityGuid, 
context.changeMarker, context.guidsToProcess);
+    }
+
     private void getEntityGuidsForConnectedFetch(AtlasEntity entity, 
ExportContext context, TraversalDirection direction) {
         if (direction == null || direction == TraversalDirection.UNKNOWN) {
             getConnectedEntityGuids(entity, context, 
TraversalDirection.OUTWARD, TraversalDirection.INWARD);
@@ -651,7 +684,7 @@ public class ExportService {
         }
     }
 
-    private enum TraversalDirection {
+    public enum TraversalDirection {
         UNKNOWN,
         INWARD,
         OUTWARD,
@@ -682,9 +715,11 @@ public class ExportService {
 
     static class ExportContext {
         private static final int REPORTING_THREASHOLD = 1000;
+        private static final String ATLAS_TYPE_HIVE_DB = "hive_db";
+
 
         final Set<String>                     guidsProcessed = new HashSet<>();
-        final UniqueList<String>              guidsToProcess = new 
UniqueList<>();
+        final private UniqueList<String>      guidsToProcess = new 
UniqueList<>();
         final UniqueList<String>              lineageToProcess = new 
UniqueList<>();
         final Set<String>                     lineageProcessed = new 
HashSet<>();
         final Map<String, TraversalDirection> guidDirection  = new HashMap<>();
@@ -700,7 +735,8 @@ public class ExportService {
         private final ExportFetchType     fetchType;
         private final String              matchType;
         private final boolean             skipLineage;
-        private final long changeMarker;
+        private final long                changeMarker;
+        private final boolean isHiveDBIncremental;
 
         private       int                 progressReportCount = 0;
 
@@ -713,16 +749,18 @@ public class ExportService {
             fetchType    = 
ExportFetchType.from(result.getRequest().getFetchTypeOptionValue());
             matchType    = result.getRequest().getMatchTypeOptionValue();
             skipLineage  = result.getRequest().getSkipLineageOptionValue();
-            this.changeMarker = getChangeTokenFromOptions(fetchType, 
result.getRequest());
+            this.changeMarker = 
result.getRequest().getChangeTokenFromOptions();
+            this.isHiveDBIncremental = 
checkHiveDBIncrementalSkipLineage(result.getRequest());
         }
 
-        private long getChangeTokenFromOptions(ExportFetchType fetchType, 
AtlasExportRequest request) {
-            if(fetchType == ExportFetchType.INCREMENTAL &&
-                    
request.getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER))
 {
-                return 
Long.parseLong(request.getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER).toString());
+        private boolean checkHiveDBIncrementalSkipLineage(AtlasExportRequest 
request) {
+            if(request.getItemsToExport().size() == 0) {
+                return false;
             }
 
-            return 0L;
+            return 
request.getItemsToExport().get(0).getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_DB)
 &&
+                    
request.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL)
 &&
+                    request.getSkipLineageOptionValue();
         }
 
         public List<AtlasEntity> 
getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) {
@@ -752,12 +790,10 @@ public class ExportService {
         }
 
         public void addToBeProcessed(boolean isSuperTypeProcess, String guid, 
TraversalDirection direction) {
-            if(!isSuperTypeProcess) {
-                guidsToProcess.add(guid);
-            }
-
             if(isSuperTypeProcess) {
                 lineageToProcess.add(guid);
+            } else {
+                guidsToProcess.add(guid);
             }
 
             guidDirection.put(guid, direction);
@@ -786,5 +822,9 @@ public class ExportService {
         public void addToSink(AtlasEntityWithExtInfo entityWithExtInfo) throws 
AtlasBaseException {
             sink.add(entityWithExtInfo);
         }
+
+        public boolean isHiveDBIncrementalSkipLineage() {
+            return isHiveDBIncremental;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/558fe964/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
index fddd60b..4a09c0f 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/HdfsPathEntityCreator.java
@@ -22,8 +22,8 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
-import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.slf4j.Logger;
@@ -50,10 +50,10 @@ public class HdfsPathEntityCreator {
     private final String PATH_SEPARATOR = "/";
 
     private AtlasTypeRegistry typeRegistry;
-    private AtlasEntityStoreV1 entityStore;
+    private AtlasEntityStoreV2 entityStore;
 
     @Inject
-    public HdfsPathEntityCreator(AtlasTypeRegistry typeRegistry, 
AtlasEntityStoreV1 entityStore) {
+    public HdfsPathEntityCreator(AtlasTypeRegistry typeRegistry, 
AtlasEntityStoreV2 entityStore) {
         this.typeRegistry = typeRegistry;
         this.entityStore = entityStore;
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/558fe964/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
new file mode 100644
index 0000000..3a2a917
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.util.UniqueList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class IncrementalExportEntityProvider {
+    private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalExportEntityProvider.class);
+
+    private static final String QUERY_PARAMETER_START_GUID = "startGuid";
+    private static final String QUERY_PARAMETER_MODIFICATION_TIMESTAMP = 
"modificationTimestamp";
+
+    private AtlasGraph atlasGraph;
+
+    private static final String QUERY_DB = "g.V().has('__guid', startGuid)";
+    private static final String QUERY_TABLE = QUERY_DB + 
".in('__hive_table.db')";
+    private static final String QUERY_SD = QUERY_TABLE + 
".out('__hive_table.sd')";
+    private static final String QUERY_COLUMN = QUERY_TABLE + 
".out('__hive_table.columns')";
+    private static final String TRANSFORM_CLAUSE = 
".project('__guid').by('__guid').dedup().toList()";
+    private static final String TIMESTAMP_CLAUSE = 
".has('__modificationTimestamp', gt(modificationTimestamp))";
+
+    private ScriptEngine scriptEngine;
+
+    @Inject
+    public IncrementalExportEntityProvider(AtlasGraph atlasGraph, ScriptEngine 
scriptEngine) {
+        this.atlasGraph = atlasGraph;
+        this.scriptEngine = scriptEngine;
+    }
+
+    public void populate(String dbEntityGuid, long timeStamp, 
UniqueList<String> guidsToProcess) {
+        if(timeStamp == 0L) {
+            full(dbEntityGuid, guidsToProcess);
+        } else {
+            partial(dbEntityGuid, timeStamp, guidsToProcess);
+        }
+    }
+
+    private void partial(String dbEntityGuid, long timeStamp, 
UniqueList<String> guidsToProcess) {
+        guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_TABLE, 
timeStamp));
+        guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_SD, timeStamp));
+        guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_COLUMN, 
timeStamp));
+    }
+
+    private void full(String dbEntityGuid, UniqueList<String> guidsToProcess) {
+        guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_TABLE, 0L));
+    }
+
+    private List<String> fetchGuids(final String dbEntityGuid, String query, 
long timeStamp) {
+        Map<String, Object> bindings = new HashMap<String, Object>() {{
+            put(QUERY_PARAMETER_START_GUID, dbEntityGuid);
+        }};
+
+        String queryWithClause = query;
+        if(timeStamp > 0L) {
+            bindings.put(QUERY_PARAMETER_MODIFICATION_TIMESTAMP, timeStamp);
+            queryWithClause = queryWithClause.concat(TIMESTAMP_CLAUSE);
+        }
+
+        return executeGremlinQuery(queryWithClause, bindings);
+    }
+
+    private List<String> executeGremlinQuery(String query, Map<String, Object> 
bindings) {
+        try {
+            List<String> guids = new ArrayList<>();
+            String queryWithTransform = query + TRANSFORM_CLAUSE;
+            List<Map<String, Object>> result = (List<Map<String, Object>>)
+                    atlasGraph.executeGremlinScript(scriptEngine, bindings, 
queryWithTransform, false);
+            if (result == null) {
+                return guids;
+            }
+
+            for (Map<String, Object> item : result) {
+                guids.add((String) item.get(ExportService.PROPERTY_GUID));
+            }
+
+            return guids;
+
+        } catch (ScriptException e) {
+            LOG.error("error executing query: {}: bindings: {}", query, 
bindings, e);
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/558fe964/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java 
b/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java
index 9148ce0..eebbc4e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java
+++ b/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java
@@ -44,6 +44,12 @@ public class UniqueList<T> {
         }
     }
 
+    public void addAll(List<T> list) {
+        for (T item : list) {
+            add(item);
+        }
+    }
+
     public T remove(int index) {
         T e = list.remove(index);
         set.remove(e);

http://git-wip-us.apache.org/repos/asf/atlas/blob/558fe964/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
new file mode 100644
index 0000000..de0a8f8
--- /dev/null
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.repository.util.UniqueList;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import javax.script.ScriptEngine;
+import java.io.IOException;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class IncrementalExportEntityProviderTest extends ExportImportTestBase {
+    @Inject
+    AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    @Inject
+    private AtlasEntityStoreV2 entityStore;
+
+    @Inject
+    private AtlasGraph atlasGraph;
+
+    private IncrementalExportEntityProvider incrementalExportEntityProvider;
+    private ScriptEngine gremlinScriptEngine;
+
+    @BeforeClass
+    public void setup() throws IOException, AtlasBaseException {
+        basicSetup(typeDefStore, typeRegistry);
+        createEntities(entityStore, ENTITIES_SUB_DIR, new String[] { "db", 
"table-columns"});
+        final String[] entityGuids = {DB_GUID, TABLE_GUID};
+        verifyCreatedEntities(entityStore, entityGuids, 2);
+
+        gremlinScriptEngine = atlasGraph.getGremlinScriptEngine();
+        EntityGraphRetriever entityGraphRetriever = new 
EntityGraphRetriever(this.typeRegistry);
+        incrementalExportEntityProvider = new 
IncrementalExportEntityProvider(atlasGraph, gremlinScriptEngine);
+    }
+
+    @AfterClass
+    public void tearDown() {
+        if(gremlinScriptEngine != null) {
+            atlasGraph.releaseGremlinScriptEngine(gremlinScriptEngine);
+        }
+    }
+
+    @Test
+    public void verify() {
+        executeQueries(0L, 1);
+        executeQueries(1L, 9);
+    }
+
+    private void executeQueries(long timeStamp, int expectedEntityCount) {
+        UniqueList<String> uniqueList = new UniqueList<>();
+        incrementalExportEntityProvider.populate(DB_GUID, timeStamp, 
uniqueList);
+
+        for (String g : uniqueList.getList()) {
+            assertTrue(g instanceof String);
+        }
+
+        assertEquals(uniqueList.size(), expectedEntityCount);
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/558fe964/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
----------------------------------------------------------------------
diff --git 
a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json 
b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
index fdd3b01..ba125e7 100644
--- 
a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
+++ 
b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
@@ -6,6 +6,7 @@
   ],
   "options": {
     "fetchType": "incremental",
+    "skipLineage": "true",
     "changeMarker": 0
   }
 }

Reply via email to