NIFI-4395 GenerateTableFetch can't fetch column type by state after instance reboot
Add multiple states recover Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #2442 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fd8f1bf9 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fd8f1bf9 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fd8f1bf9 Branch: refs/heads/HDF-3.1-maint Commit: fd8f1bf9083d2baf4d43e0666c774a8d5f83e2d8 Parents: b0142dc Author: Deon Huang <yjhyjhy...@gmail.com> Authored: Mon Jan 29 23:29:21 2018 +0800 Committer: Matt Gilman <matt.c.gil...@gmail.com> Committed: Thu May 10 11:11:43 2018 -0400 ---------------------------------------------------------------------- .../processors/standard/GenerateTableFetch.java | 8 +-- .../standard/TestGenerateTableFetch.java | 61 ++++++++++++++++++++ 2 files changed, 63 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/fd8f1bf9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index 8f535b3..188e282 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -245,7 +245,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { maxValueSelectColumns.add("MAX(" + colName + ") " + colName); String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName); if (!StringUtils.isEmpty(maxValue)) { - if(columnTypeMap.isEmpty()){ + if(columnTypeMap.isEmpty() || getColumnType(tableName, colName) == null){ // This means column type cache is clean after instance reboot. We should re-cache column type super.setup(context, false, finalFileToProcess); } @@ -327,7 +327,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { maxValueSelectColumns.add("MAX(" + colName + ") " + colName); String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName); if (!StringUtils.isEmpty(maxValue)) { - if(columnTypeMap.isEmpty()){ + if(columnTypeMap.isEmpty() || getColumnType(tableName, colName) == null){ // This means column type cache is clean after instance reboot. We should re-cache column type super.setup(context, false, finalFileToProcess); } @@ -419,10 +419,6 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { // If the table name is static and the fully-qualified key was not found, try just the column name type = columnTypeMap.get(getStateKey(null, colName)); } - if (type == null) { - // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed - throw new ProcessException("No column type cache found for: " + colName); - } return type; } http://git-wip-us.apache.org/repos/asf/nifi/blob/fd8f1bf9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 117b47e..253f4d0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -1191,6 +1191,67 @@ public class TestGenerateTableFetch { runner.clearTransferState(); } + @Test + public void testMultipleColumnTypeMissing() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // Load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + stmt.execute("drop table TEST_QUERY_DB_TABLE_2"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + // Create multiple table to invoke processor state stored + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)"); + stmt.execute("create table TEST_QUERY_DB_TABLE_2 (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE_2 (id, bucket) VALUES (1, 0)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}"); + runner.setIncomingConnection(true); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}"); + + runner.enqueue("".getBytes(), new HashMap<String, String>() {{ + put("tableName", "TEST_QUERY_DB_TABLE"); + put("maxValueCol", "id"); + }}); + + runner.enqueue("".getBytes(), new HashMap<String, String>() {{ + put("tableName", "TEST_QUERY_DB_TABLE_2"); + put("maxValueCol", "id"); + }}); + runner.run(2); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2); + + assertEquals(2,processor.columnTypeMap.size()); + runner.clearTransferState(); + + + // Remove one element from columnTypeMap to simulate it's re-cache partial state + Map.Entry<String,Integer> entry = processor.columnTypeMap.entrySet().iterator().next(); + String key = entry.getKey(); + processor.columnTypeMap.remove(key); + + // Insert new records + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)"); + + // Re-launch FlowFile to se if re-cache column type works + runner.enqueue("".getBytes(), new HashMap<String, String>() {{ + put("tableName", "TEST_QUERY_DB_TABLE"); + put("maxValueCol", "id"); + }}); + + // It should re-cache column type + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + assertEquals(2,processor.columnTypeMap.size()); + runner.clearTransferState(); + } + /** * Simple implementation only for GenerateTableFetch processor testing. */