NIFI-259: Update GetHBase to use new State Management feature; updated docs; 
bug fixes


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d39067ed
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d39067ed
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d39067ed

Branch: refs/heads/master
Commit: d39067ede6b7ef03b211e2d040db244018fc510f
Parents: bbce596
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Jan 13 13:30:05 2016 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Wed Jan 13 13:30:05 2016 -0500

----------------------------------------------------------------------
 .../standard/AbstractListProcessor.java         | 21 ++++++++---
 .../standard/TestAbstractListProcessor.java     | 39 ++++++++++++++++++++
 2 files changed, 54 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d39067ed/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index efe551f..494f227 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -363,11 +363,12 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
             return;
         }
 
-        int listCount = 0;
         Long latestListingTimestamp = null;
+        final List<T> newEntries = new ArrayList<>();
         for (final T entity : entityList) {
-            final boolean list = (minTimestamp == null || 
entity.getTimestamp() > minTimestamp
-                || (entity.getTimestamp() == minTimestamp && 
!latestIdentifiersListed.contains(entity.getIdentifier())));
+            final boolean newTimestamp = minTimestamp == null || 
entity.getTimestamp() > minTimestamp;
+            final boolean newEntryForTimestamp = minTimestamp != null && 
entity.getTimestamp() == minTimestamp && 
!latestIdentifiersListed.contains(entity.getIdentifier());
+            final boolean list = newTimestamp || newEntryForTimestamp;
 
             // Create the FlowFile for this path.
             if (list) {
@@ -375,7 +376,14 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                 FlowFile flowFile = session.create();
                 flowFile = session.putAllAttributes(flowFile, attributes);
                 session.transfer(flowFile, REL_SUCCESS);
-                listCount++;
+
+                // If we don't have a new timestamp but just have a new entry, 
we need to
+                // add all of the previous entries to our entityList. If we 
have a new timestamp,
+                // then the previous entries can go away.
+                if (!newTimestamp) {
+                    newEntries.addAll(entityList);
+                }
+                newEntries.add(entity);
 
                 if (latestListingTimestamp == null || entity.getTimestamp() > 
latestListingTimestamp) {
                     latestListingTimestamp = entity.getTimestamp();
@@ -383,6 +391,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
             }
         }
 
+        final int listCount = newEntries.size();
         if (listCount > 0) {
             getLogger().info("Successfully created listing with {} new 
objects", new Object[] {listCount});
             session.commit();
@@ -395,9 +404,9 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
             // previously Primary Node left off.
             // We also store the state locally so that if the node is 
restarted, and the node cannot contact
             // the distributed state cache, the node can continue to run (if 
it is primary node).
-            final Set<String> identifiers = new HashSet<>(entityList.size());
+            final Set<String> identifiers = new HashSet<>(newEntries.size());
             try {
-                for (final T entity : entityList) {
+                for (final T entity : newEntries) {
                     identifiers.add(entity.getIdentifier());
                 }
                 persist(latestListingTimestamp, identifiers, 
context.getStateManager());

http://git-wip-us.apache.org/repos/asf/nifi/blob/d39067ed/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
index 1da2b4d..3a432e7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.processors.standard;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -30,6 +31,7 @@ import java.util.UUID;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
@@ -154,6 +156,43 @@ public class TestAbstractListProcessor {
         assertEquals(1, cache.fetchCount);
     }
 
+    @Test
+    public void testOnlyNewStateStored() throws IOException {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        proc.addEntity("name", "id", 1492L);
+        proc.addEntity("name", "id2", 1492L);
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        final StateMap stateMap = 
runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals(1, stateMap.getVersion());
+
+        final Map<String, String> map = stateMap.toMap();
+        assertEquals(3, map.size());
+        assertEquals("1492", map.get("timestamp"));
+        assertTrue(map.containsKey("id.1"));
+        assertTrue(map.containsKey("id.2"));
+
+        proc.addEntity("new name", "new id", 1493L);
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        final StateMap updatedStateMap = 
runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals(2, updatedStateMap.getVersion());
+
+        final Map<String, String> updatedValues = updatedStateMap.toMap();
+        assertEquals(2, updatedValues.size());
+        assertEquals("1493", updatedValues.get("timestamp"));
+        assertEquals("new id", updatedValues.get("id.1"));
+    }
+
+
     private static class DistributedCache extends AbstractControllerService 
implements DistributedMapCacheClient {
         private final Map<Object, Object> stored = new HashMap<>();
         private int fetchCount = 0;

Reply via email to