ATLAS-1919: Refactored ZipSink to record committed guids

Signed-off-by: Madhan Neethiraj <mad...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/3962057c
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/3962057c
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/3962057c

Branch: refs/heads/feature-odf
Commit: 3962057c640e1fbecb4cfb6a785475e441a7de1d
Parents: bcabde9
Author: ashutoshm <ames...@hortonworks.com>
Authored: Mon Jul 10 08:25:58 2017 -0700
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Mon Jul 17 10:12:18 2017 -0700

----------------------------------------------------------------------
 .../atlas/repository/impexp/ExportService.java  |  4 +
 .../apache/atlas/repository/impexp/ZipSink.java | 21 ++++++
 .../atlas/repository/impexp/ZipSinkTest.java    | 79 +++++++++++++++++---
 3 files changed, 93 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/3962057c/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 3538cfd..8f45e9f 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -433,6 +433,10 @@ public class ExportService {
     }
 
     private void addEntity(AtlasEntityWithExtInfo entity, ExportContext 
context) throws AtlasBaseException {
+        if(context.sink.hasEntity(entity.getEntity().getGuid())) {
+            return;
+        }
+
         context.sink.add(entity);
 
         context.result.incrementMeticsCounter(String.format("entity:%s", 
entity.getEntity().getTypeName()));

http://git-wip-us.apache.org/repos/asf/atlas/blob/3962057c/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
index 4bb04da..17ebbf1 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
@@ -27,7 +27,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
@@ -35,6 +37,8 @@ public class ZipSink {
     private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class);
 
     private ZipOutputStream zipOutputStream;
+    final Set<String>       guids = new HashSet<>();
+
 
     public ZipSink(OutputStream outputStream) {
         zipOutputStream = new ZipOutputStream(outputStream);
@@ -43,11 +47,13 @@ public class ZipSink {
     public void add(AtlasEntity entity) throws AtlasBaseException {
         String jsonData = convertToJSON(entity);
         saveToZip(entity.getGuid(), jsonData);
+        recordAddedEntityGuids(entity);
     }
 
     public void add(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) 
throws AtlasBaseException {
         String jsonData = convertToJSON(entityWithExtInfo);
         saveToZip(entityWithExtInfo.getEntity().getGuid(), jsonData);
+        recordAddedEntityGuids(entityWithExtInfo);
     }
 
     public void setResult(AtlasExportResult result) throws AtlasBaseException {
@@ -100,4 +106,19 @@ public class ZipSink {
         zipOutputStream.write(payload.getBytes());
         zipOutputStream.closeEntry();
     }
+
+    public boolean hasEntity(String guid) {
+        return guids.contains(guid);
+    }
+
+    private void recordAddedEntityGuids(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo) {
+        guids.add(entityWithExtInfo.getEntity().getGuid());
+        if(entityWithExtInfo.getReferredEntities() != null) {
+            guids.addAll(entityWithExtInfo.getReferredEntities().keySet());
+        }
+    }
+
+    private void recordAddedEntityGuids(AtlasEntity entity) {
+        guids.add(entity.getGuid());
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/3962057c/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
index 635caf7..e8bbeb5 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
@@ -21,6 +21,7 @@ package org.apache.atlas.repository.impexp;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.type.AtlasType;
 import org.testng.Assert;
@@ -35,11 +36,15 @@ import java.util.List;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 
+import static org.testng.Assert.*;
+
 public class ZipSinkTest {
     private ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
     private ZipSink zipSink;
     private List<String> defaultExportOrder = new 
ArrayList<>(Arrays.asList("a", "b", "c", "d"));
     private AtlasExportResult defaultExportResult;
+    private String knownEntityGuidFormat = "111-222-333-%s";
+
 
     private void initZipSinkWithExportOrder() throws AtlasBaseException {
         zipSink = new ZipSink(byteArrayOutputStream);
@@ -80,7 +85,7 @@ public class ZipSinkTest {
     @Test
     public void correctInit_succeeds() throws AtlasBaseException {
         initZipSinkWithExportOrder();
-        Assert.assertTrue(true);
+        assertTrue(true);
         Assert.assertNotNull(zipSink);
     }
 
@@ -95,11 +100,11 @@ public class ZipSinkTest {
                 Assert.assertNull(zis.getNextEntry());
             } catch (IOException e) {
 
-                Assert.assertTrue(false);
+                assertTrue(false);
             }
         } catch (AtlasBaseException e) {
 
-            Assert.assertTrue(false, "No exception should be thrown.");
+            assertTrue(false, "No exception should be thrown.");
         }
     }
 
@@ -109,7 +114,7 @@ public class ZipSinkTest {
         ZipInputStream zis = getZipInputStreamForDefaultExportOrder();
         ZipEntry ze = zis.getNextEntry();
 
-        Assert.assertEquals(ze.getName().replace(".json", ""), 
ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString());
+        assertEquals(ze.getName().replace(".json", ""), 
ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString());
     }
 
     @Test
@@ -118,28 +123,80 @@ public class ZipSinkTest {
         ZipInputStream zis = getZipInputStreamForDefaultExportOrder();
         zis.getNextEntry();
 
-        Assert.assertEquals(getZipEntryAsStream(zis).replace("\"", "'"), 
"['a','b','c','d']");
+        assertEquals(getZipEntryAsStream(zis).replace("\"", "'"), 
"['a','b','c','d']");
     }
 
     @Test
     public void zipWithExactlyTwoEntries_ContentsVerified() throws 
AtlasBaseException, IOException {
 
         ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
-        useZipSinkToCreateZipWithTwoEntries(byteOutputStream);
+        useZipSinkToCreateEntries(byteOutputStream);
 
         ByteArrayInputStream bis = new 
ByteArrayInputStream(byteOutputStream.toByteArray());
         ZipInputStream zipStream = new ZipInputStream(bis);
         ZipEntry entry = zipStream.getNextEntry();
 
-        Assert.assertEquals(getZipEntryAsStream(zipStream), 
"[\"a\",\"b\",\"c\",\"d\"]");
-        Assert.assertEquals(entry.getName().replace(".json", ""), 
ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString());
+        assertEquals(getZipEntryAsStream(zipStream), 
"[\"a\",\"b\",\"c\",\"d\"]");
+        assertEquals(entry.getName().replace(".json", ""), 
ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString());
 
         entry = zipStream.getNextEntry();
-        Assert.assertEquals(entry.getName().replace(".json", ""), 
ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString());
-        
Assert.assertTrue(compareJsonWithObject(getZipEntryAsStream(zipStream), 
defaultExportResult));
+        assertEquals(entry.getName().replace(".json", ""), 
ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString());
+        assertTrue(compareJsonWithObject(getZipEntryAsStream(zipStream), 
defaultExportResult));
+    }
+
+    @Test
+    public void recordsEntityEntries() throws AtlasBaseException {
+        ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
+        ZipSink zs = new ZipSink(byteOutputStream);
+
+        AtlasEntity entity = new AtlasEntity();
+        entity.setGuid(String.format(knownEntityGuidFormat, 0));
+
+        zs.add(entity);
+        assertTrue(zs.hasEntity(String.format(knownEntityGuidFormat, 0)));
+
+        zs.close();
+    }
+
+    @Test
+    public void recordsEntityWithExtInfoEntries() throws AtlasBaseException {
+        final int max_entries = 3;
+        ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
+        ZipSink zs = new ZipSink(byteOutputStream);
+
+        AtlasEntity entity = new AtlasEntity();
+        entity.setGuid(String.format(knownEntityGuidFormat, 0));
+
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = new 
AtlasEntity.AtlasEntityWithExtInfo(entity);
+        addReferredEntities(entityWithExtInfo, max_entries);
+
+        zs.add(entityWithExtInfo);
+        for (int i = 0; i <= max_entries; i++) {
+            String g = String.format(knownEntityGuidFormat, i);
+            assertTrue(zs.hasEntity(g));
+        }
+
+        zs.close();
+    }
+
+    private void addReferredEntities(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo, int maxEntries) {
+
+        for (int i = 1; i <= maxEntries; i++) {
+            AtlasEntity entity1 = new AtlasEntity();
+            entity1.setGuid(String.format(knownEntityGuidFormat, i));
+            entityWithExtInfo.addReferredEntity(entity1);
+        }
+    }
+
+    @Test
+    public void recordsDoesNotRecordEntityEntries() throws AtlasBaseException {
+        initZipSinkWithExportOrder();
+
+        assertNotNull(zipSink);
+        
assertFalse(zipSink.hasEntity(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString()));
     }
 
-    private void useZipSinkToCreateZipWithTwoEntries(ByteArrayOutputStream 
byteOutputStream) throws AtlasBaseException {
+    private void useZipSinkToCreateEntries(ByteArrayOutputStream 
byteOutputStream) throws AtlasBaseException {
         ZipSink zs = new ZipSink(byteOutputStream);
         zs.setExportOrder(defaultExportOrder);
         zs.setResult(getDefaultExportResult());

Reply via email to