[47/50] nifi git commit: NIFI-4395 GenerateTableFetch can't fetch column type by state after instance reboot

2018-05-10 Thread aldrin
NIFI-4395 GenerateTableFetch can't fetch column type by state after instance 
reboot

Add multiple states recover

Signed-off-by: Matthew Burgess 

This closes #2442


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fd8f1bf9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fd8f1bf9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fd8f1bf9

Branch: refs/heads/HDF-3.1-maint
Commit: fd8f1bf9083d2baf4d43e0666c774a8d5f83e2d8
Parents: b0142dc
Author: Deon Huang 
Authored: Mon Jan 29 23:29:21 2018 +0800
Committer: Matt Gilman 
Committed: Thu May 10 11:11:43 2018 -0400

--
 .../processors/standard/GenerateTableFetch.java |  8 +--
 .../standard/TestGenerateTableFetch.java| 61 
 2 files changed, 63 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/fd8f1bf9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 8f535b3..188e282 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -245,7 +245,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
 maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
 String maxValue = getColumnStateMaxValue(tableName, 
statePropertyMap, colName);
 if (!StringUtils.isEmpty(maxValue)) {
-if(columnTypeMap.isEmpty()){
+if(columnTypeMap.isEmpty() || getColumnType(tableName, 
colName) == null){
 // This means column type cache is clean after 
instance reboot. We should re-cache column type
 super.setup(context, false, finalFileToProcess);
 }
@@ -327,7 +327,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
 maxValueSelectColumns.add("MAX(" + colName + ") " + 
colName);
 String maxValue = getColumnStateMaxValue(tableName, 
statePropertyMap, colName);
 if (!StringUtils.isEmpty(maxValue)) {
-if(columnTypeMap.isEmpty()){
+if(columnTypeMap.isEmpty() || getColumnType(tableName, 
colName) == null){
 // This means column type cache is clean after 
instance reboot. We should re-cache column type
 super.setup(context, false, finalFileToProcess);
 }
@@ -419,10 +419,6 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
 // If the table name is static and the fully-qualified key was not 
found, try just the column name
 type = columnTypeMap.get(getStateKey(null, colName));
 }
-if (type == null) {
-// This shouldn't happen as we are populating columnTypeMap when 
the processor is scheduled or when the first maximum is observed
-throw new ProcessException("No column type cache found for: " + 
colName);
-}
 
 return type;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/fd8f1bf9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 117b47e..253f4d0 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -1191,6 +1191,67 @@ public class TestGenerateTableFetch {
 runner.clearTransferState();
 }
 
+@Test
+public void testMultipleColumnTypeMissing() throws 

nifi git commit: NIFI-4395 GenerateTableFetch can't fetch column type by state after instance reboot

2018-02-02 Thread mattyb149
Repository: nifi
Updated Branches:
  refs/heads/master 9bc00b6b6 -> b5938062a


NIFI-4395 GenerateTableFetch can't fetch column type by state after instance 
reboot

Add multiple states recover

Signed-off-by: Matthew Burgess 

This closes #2442


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b5938062
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b5938062
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b5938062

Branch: refs/heads/master
Commit: b5938062a8ff0d7c8bb2c1985dc2605b2a69b6dd
Parents: 9bc00b6
Author: Deon Huang 
Authored: Mon Jan 29 23:29:21 2018 +0800
Committer: Matthew Burgess 
Committed: Fri Feb 2 11:15:46 2018 -0500

--
 .../processors/standard/GenerateTableFetch.java |  8 +--
 .../standard/TestGenerateTableFetch.java| 61 
 2 files changed, 63 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/b5938062/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index 8f535b3..188e282 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -245,7 +245,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
 maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
 String maxValue = getColumnStateMaxValue(tableName, 
statePropertyMap, colName);
 if (!StringUtils.isEmpty(maxValue)) {
-if(columnTypeMap.isEmpty()){
+if(columnTypeMap.isEmpty() || getColumnType(tableName, 
colName) == null){
 // This means column type cache is clean after 
instance reboot. We should re-cache column type
 super.setup(context, false, finalFileToProcess);
 }
@@ -327,7 +327,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
 maxValueSelectColumns.add("MAX(" + colName + ") " + 
colName);
 String maxValue = getColumnStateMaxValue(tableName, 
statePropertyMap, colName);
 if (!StringUtils.isEmpty(maxValue)) {
-if(columnTypeMap.isEmpty()){
+if(columnTypeMap.isEmpty() || getColumnType(tableName, 
colName) == null){
 // This means column type cache is clean after 
instance reboot. We should re-cache column type
 super.setup(context, false, finalFileToProcess);
 }
@@ -419,10 +419,6 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
 // If the table name is static and the fully-qualified key was not 
found, try just the column name
 type = columnTypeMap.get(getStateKey(null, colName));
 }
-if (type == null) {
-// This shouldn't happen as we are populating columnTypeMap when 
the processor is scheduled or when the first maximum is observed
-throw new ProcessException("No column type cache found for: " + 
colName);
-}
 
 return type;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5938062/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 67a9bad..f20dee8 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -1095,6 +1095,67 @@ public class TestGenerateTableFetch {
 runner.clearTransferState();
 

nifi git commit: NIFI-4395 - GenerateTableFetch can't fetch column type by state after instance reboot

2017-09-22 Thread mattyb149
Repository: nifi
Updated Branches:
  refs/heads/master 9e2c7be7d -> a29348f2a


NIFI-4395 - GenerateTableFetch can't fetch column type by state after instance 
reboot

NIFI-4395: Updated unit test for GenerateTableFetch
Signed-off-by: Matthew Burgess 

This closes #2166


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a29348f2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a29348f2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a29348f2

Branch: refs/heads/master
Commit: a29348f2a4d8ec6576f1ec73c57911b827d46315
Parents: 9e2c7be
Author: Deon Huang 
Authored: Thu Sep 21 15:34:00 2017 +0800
Committer: Matthew Burgess 
Committed: Fri Sep 22 14:34:00 2017 -0400

--
 .../AbstractDatabaseFetchProcessor.java | 13 +++--
 .../processors/standard/GenerateTableFetch.java | 25 ++---
 .../standard/TestGenerateTableFetch.java| 55 
 3 files changed, 82 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/a29348f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
index 1f26976..fa2a86e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
@@ -22,6 +22,7 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
@@ -222,7 +223,11 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
 }
 
 public void setup(final ProcessContext context) {
-final String maxValueColumnNames = 
context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+setup(context,true,null);
+}
+
+public void setup(final ProcessContext context, boolean shouldCleanCache, 
FlowFile flowFile) {
+final String maxValueColumnNames = 
context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue();
 
 // If there are no max-value column names specified, we don't need to 
perform this processing
 if (StringUtils.isEmpty(maxValueColumnNames)) {
@@ -231,7 +236,7 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
 
 // Try to fill the columnTypeMap with the types of the desired 
max-value columns
 final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
 
 final DatabaseAdapter dbAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
 try (final Connection con = dbcpService.getConnection();
@@ -245,7 +250,9 @@ public abstract class AbstractDatabaseFetchProcessor 
extends AbstractSessionFact
 ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
 int numCols = resultSetMetaData.getColumnCount();
 if (numCols > 0) {
-columnTypeMap.clear();
+if (shouldCleanCache){
+columnTypeMap.clear();
+}
 for (int i = 1; i <= numCols; i++) {
 String colName = 
resultSetMetaData.getColumnName(i).toLowerCase();
 String colKey = getStateKey(tableName, colName);

http://git-wip-us.apache.org/repos/asf/nifi/blob/a29348f2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
--
diff --git