NIFI-4395 GenerateTableFetch can't fetch column type by state after instance 
reboot

Add multiple states recover

Signed-off-by: Matthew Burgess <mattyb...@apache.org>

This closes #2442


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fd8f1bf9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fd8f1bf9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fd8f1bf9

Branch: refs/heads/HDF-3.1-maint
Commit: fd8f1bf9083d2baf4d43e0666c774a8d5f83e2d8
Parents: b0142dc
Author: Deon Huang <yjhyjhy...@gmail.com>
Authored: Mon Jan 29 23:29:21 2018 +0800
Committer: Matt Gilman <matt.c.gil...@gmail.com>
Committed: Thu May 10 11:11:43 2018 -0400

----------------------------------------------------------------------
 .../processors/standard/GenerateTableFetch.java |  8 +--
 .../standard/TestGenerateTableFetch.java        | 61 ++++++++++++++++++++
 2 files changed, 63 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fd8f1bf9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 8f535b3..188e282 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -245,7 +245,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
                 maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
                 String maxValue = getColumnStateMaxValue(tableName, 
statePropertyMap, colName);
                 if (!StringUtils.isEmpty(maxValue)) {
-                    if(columnTypeMap.isEmpty()){
+                    if(columnTypeMap.isEmpty() || getColumnType(tableName, 
colName) == null){
                         // This means column type cache is clean after 
instance reboot. We should re-cache column type
                         super.setup(context, false, finalFileToProcess);
                     }
@@ -327,7 +327,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
                     maxValueSelectColumns.add("MAX(" + colName + ") " + 
colName);
                     String maxValue = getColumnStateMaxValue(tableName, 
statePropertyMap, colName);
                     if (!StringUtils.isEmpty(maxValue)) {
-                        if(columnTypeMap.isEmpty()){
+                        if(columnTypeMap.isEmpty() || getColumnType(tableName, 
colName) == null){
                             // This means column type cache is clean after 
instance reboot. We should re-cache column type
                             super.setup(context, false, finalFileToProcess);
                         }
@@ -419,10 +419,6 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
             // If the table name is static and the fully-qualified key was not 
found, try just the column name
             type = columnTypeMap.get(getStateKey(null, colName));
         }
-        if (type == null) {
-            // This shouldn't happen as we are populating columnTypeMap when 
the processor is scheduled or when the first maximum is observed
-            throw new ProcessException("No column type cache found for: " + 
colName);
-        }
 
         return type;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd8f1bf9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 117b47e..253f4d0 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -1191,6 +1191,67 @@ public class TestGenerateTableFetch {
         runner.clearTransferState();
     }
 
+    @Test
+    public void testMultipleColumnTypeMissing() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
+
+        // Load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+            stmt.execute("drop table TEST_QUERY_DB_TABLE_2");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        // Create multiple table to invoke processor state stored
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
bucket integer not null)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 
0)");
+        stmt.execute("create table TEST_QUERY_DB_TABLE_2 (id integer not null, 
bucket integer not null)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE_2 (id, bucket) VALUES 
(1, 0)");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}");
+        runner.setIncomingConnection(true);
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, 
"${maxValueCol}");
+
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {{
+            put("tableName", "TEST_QUERY_DB_TABLE");
+            put("maxValueCol", "id");
+        }});
+
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {{
+            put("tableName", "TEST_QUERY_DB_TABLE_2");
+            put("maxValueCol", "id");
+        }});
+        runner.run(2);
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
+
+        assertEquals(2,processor.columnTypeMap.size());
+        runner.clearTransferState();
+
+
+        // Remove one element from columnTypeMap to simulate it's re-cache 
partial state
+        Map.Entry<String,Integer> entry = 
processor.columnTypeMap.entrySet().iterator().next();
+        String key = entry.getKey();
+        processor.columnTypeMap.remove(key);
+
+        // Insert new records
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 
0)");
+
+        // Re-launch FlowFile to se if re-cache column type works
+        runner.enqueue("".getBytes(), new HashMap<String, String>() {{
+            put("tableName", "TEST_QUERY_DB_TABLE");
+            put("maxValueCol", "id");
+        }});
+
+        // It should re-cache column type
+        runner.run();
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        assertEquals(2,processor.columnTypeMap.size());
+        runner.clearTransferState();
+    }
+
     /**
      * Simple implementation only for GenerateTableFetch processor testing.
      */

Reply via email to