[47/50] nifi git commit: NIFI-4395 GenerateTableFetch can't fetch column type by state after instance reboot
NIFI-4395 GenerateTableFetch can't fetch column type by state after instance reboot Add multiple states recover Signed-off-by: Matthew BurgessThis closes #2442 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fd8f1bf9 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fd8f1bf9 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fd8f1bf9 Branch: refs/heads/HDF-3.1-maint Commit: fd8f1bf9083d2baf4d43e0666c774a8d5f83e2d8 Parents: b0142dc Author: Deon Huang Authored: Mon Jan 29 23:29:21 2018 +0800 Committer: Matt Gilman Committed: Thu May 10 11:11:43 2018 -0400 -- .../processors/standard/GenerateTableFetch.java | 8 +-- .../standard/TestGenerateTableFetch.java| 61 2 files changed, 63 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/fd8f1bf9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java -- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index 8f535b3..188e282 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -245,7 +245,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { maxValueSelectColumns.add("MAX(" + colName + ") " + colName); String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName); if (!StringUtils.isEmpty(maxValue)) { -if(columnTypeMap.isEmpty()){ +if(columnTypeMap.isEmpty() || getColumnType(tableName, colName) == null){ // This means column type cache is clean after instance reboot. We should re-cache column type super.setup(context, false, finalFileToProcess); } @@ -327,7 +327,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { maxValueSelectColumns.add("MAX(" + colName + ") " + colName); String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName); if (!StringUtils.isEmpty(maxValue)) { -if(columnTypeMap.isEmpty()){ +if(columnTypeMap.isEmpty() || getColumnType(tableName, colName) == null){ // This means column type cache is clean after instance reboot. We should re-cache column type super.setup(context, false, finalFileToProcess); } @@ -419,10 +419,6 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { // If the table name is static and the fully-qualified key was not found, try just the column name type = columnTypeMap.get(getStateKey(null, colName)); } -if (type == null) { -// This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed -throw new ProcessException("No column type cache found for: " + colName); -} return type; } http://git-wip-us.apache.org/repos/asf/nifi/blob/fd8f1bf9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java -- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 117b47e..253f4d0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -1191,6 +1191,67 @@ public class TestGenerateTableFetch { runner.clearTransferState(); } +@Test +public void testMultipleColumnTypeMissing() throws
nifi git commit: NIFI-4395 GenerateTableFetch can't fetch column type by state after instance reboot
Repository: nifi Updated Branches: refs/heads/master 9bc00b6b6 -> b5938062a NIFI-4395 GenerateTableFetch can't fetch column type by state after instance reboot Add multiple states recover Signed-off-by: Matthew BurgessThis closes #2442 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b5938062 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b5938062 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b5938062 Branch: refs/heads/master Commit: b5938062a8ff0d7c8bb2c1985dc2605b2a69b6dd Parents: 9bc00b6 Author: Deon Huang Authored: Mon Jan 29 23:29:21 2018 +0800 Committer: Matthew Burgess Committed: Fri Feb 2 11:15:46 2018 -0500 -- .../processors/standard/GenerateTableFetch.java | 8 +-- .../standard/TestGenerateTableFetch.java| 61 2 files changed, 63 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/b5938062/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java -- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index 8f535b3..188e282 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -245,7 +245,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { maxValueSelectColumns.add("MAX(" + colName + ") " + colName); String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName); if (!StringUtils.isEmpty(maxValue)) { -if(columnTypeMap.isEmpty()){ +if(columnTypeMap.isEmpty() || getColumnType(tableName, colName) == null){ // This means column type cache is clean after instance reboot. We should re-cache column type super.setup(context, false, finalFileToProcess); } @@ -327,7 +327,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { maxValueSelectColumns.add("MAX(" + colName + ") " + colName); String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName); if (!StringUtils.isEmpty(maxValue)) { -if(columnTypeMap.isEmpty()){ +if(columnTypeMap.isEmpty() || getColumnType(tableName, colName) == null){ // This means column type cache is clean after instance reboot. We should re-cache column type super.setup(context, false, finalFileToProcess); } @@ -419,10 +419,6 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { // If the table name is static and the fully-qualified key was not found, try just the column name type = columnTypeMap.get(getStateKey(null, colName)); } -if (type == null) { -// This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed -throw new ProcessException("No column type cache found for: " + colName); -} return type; } http://git-wip-us.apache.org/repos/asf/nifi/blob/b5938062/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java -- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 67a9bad..f20dee8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -1095,6 +1095,67 @@ public class TestGenerateTableFetch { runner.clearTransferState();
nifi git commit: NIFI-4395 - GenerateTableFetch can't fetch column type by state after instance reboot
Repository: nifi Updated Branches: refs/heads/master 9e2c7be7d -> a29348f2a NIFI-4395 - GenerateTableFetch can't fetch column type by state after instance reboot NIFI-4395: Updated unit test for GenerateTableFetch Signed-off-by: Matthew BurgessThis closes #2166 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a29348f2 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a29348f2 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a29348f2 Branch: refs/heads/master Commit: a29348f2a4d8ec6576f1ec73c57911b827d46315 Parents: 9e2c7be Author: Deon Huang Authored: Thu Sep 21 15:34:00 2017 +0800 Committer: Matthew Burgess Committed: Fri Sep 22 14:34:00 2017 -0400 -- .../AbstractDatabaseFetchProcessor.java | 13 +++-- .../processors/standard/GenerateTableFetch.java | 25 ++--- .../standard/TestGenerateTableFetch.java| 55 3 files changed, 82 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/a29348f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java -- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index 1f26976..fa2a86e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -22,6 +22,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; @@ -222,7 +223,11 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact } public void setup(final ProcessContext context) { -final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue(); +setup(context,true,null); +} + +public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFile flowFile) { +final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue(); // If there are no max-value column names specified, we don't need to perform this processing if (StringUtils.isEmpty(maxValueColumnNames)) { @@ -231,7 +236,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact // Try to fill the columnTypeMap with the types of the desired max-value columns final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); -final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); +final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); try (final Connection con = dbcpService.getConnection(); @@ -245,7 +250,9 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); int numCols = resultSetMetaData.getColumnCount(); if (numCols > 0) { -columnTypeMap.clear(); +if (shouldCleanCache){ +columnTypeMap.clear(); +} for (int i = 1; i <= numCols; i++) { String colName = resultSetMetaData.getColumnName(i).toLowerCase(); String colKey = getStateKey(tableName, colName); http://git-wip-us.apache.org/repos/asf/nifi/blob/a29348f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java -- diff --git