Repository: nifi
Updated Branches:
  refs/heads/master 75906226a -> 0207d0813


NIFI-5604: Added property to allow empty FlowFile when no SQL generated by 
GenerateTableFetch

co-authored by: Peter Wicks <patric...@gmail.com>
Signed-off-by: Peter Wicks <patric...@gmail.com>

This closes #3075.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0207d081
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0207d081
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0207d081

Branch: refs/heads/master
Commit: 0207d0813ef164ee7227ded61fb10960a4842e2d
Parents: 7590622
Author: Matthew Burgess <mattyb...@apache.org>
Authored: Thu Nov 1 22:09:13 2018 -0400
Committer: Peter Wicks <patric...@gmail.com>
Committed: Thu Nov 15 13:37:43 2018 -0700

----------------------------------------------------------------------
 .../processors/standard/GenerateTableFetch.java | 104 +++++++++++++------
 .../standard/TestGenerateTableFetch.java        |  47 ++++++++-
 2 files changed, 118 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0207d081/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index a547393..b4ba9fe 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -142,6 +142,18 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS = 
new PropertyDescriptor.Builder()
+            .name("gen-table-output-flowfile-on-zero-results")
+            .displayName("Output Empty FlowFile on Zero Results")
+            .description("Depending on the specified properties, an execution 
of this processor may not result in any SQL statements generated. When this 
property "
+                    + "is true, an empty flow file will be generated (having 
the parent of the incoming flow file if present) and transferred to the 
'success' relationship. "
+                    + "When this property is false, no output flow files will 
be generated.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
     public static final Relationship REL_FAILURE = new Relationship.Builder()
             .name("failure")
             .description("This relationship is only used when SQL query 
execution (using an incoming FlowFile) failed. The incoming FlowFile will be 
penalized and routed to this relationship. "
@@ -164,6 +176,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
         pds.add(PARTITION_SIZE);
         pds.add(COLUMN_FOR_VALUE_PARTITIONING);
         pds.add(WHERE_CLAUSE);
+        pds.add(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -247,6 +260,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
         final String columnForPartitioning = 
context.getProperty(COLUMN_FOR_VALUE_PARTITIONING).evaluateAttributeExpressions(fileToProcess).getValue();
         final boolean useColumnValsForPaging = 
!StringUtils.isEmpty(columnForPartitioning);
         final String customWhereClause = 
context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
+        final boolean outputEmptyFlowFileOnZeroResults = 
context.getProperty(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS).asBoolean();
 
         final StateManager stateManager = context.getStateManager();
         final StateMap stateMap;
@@ -435,49 +449,75 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
                 }
 
                 // Generate SQL statements to read "pages" of data
-                Long limit = partitionSize == 0 ? null : (long) partitionSize;
                 final String fragmentIdentifier = UUID.randomUUID().toString();
-                for (long i = 0; i < numberOfFetches; i++) {
-                    // Add a right bounding for the partitioning column if 
necessary (only on last partition, meaning we don't need the limit)
-                    if ((i == numberOfFetches - 1) && useColumnValsForPaging 
&& (maxValueClauses.isEmpty() || customWhereClause != null)) {
-                        maxValueClauses.add(columnForPartitioning + " <= " + 
maxValueForPartitioning);
-                        limit = null;
-                    }
+                List<FlowFile> flowFilesToTransfer = new ArrayList<>();
 
-                    //Update WHERE list to include new right hand boundaries
-                    whereClause = maxValueClauses.isEmpty() ? "1=1" : 
StringUtils.join(maxValueClauses, " AND ");
+                Map<String, String> baseAttributes = new HashMap<>();
+                baseAttributes.put("generatetablefetch.tableName", tableName);
+                if (columnNames != null) {
+                    baseAttributes.put("generatetablefetch.columnNames", 
columnNames);
+                }
 
-                    Long offset = partitionSize == 0 ? null : i * 
partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0);
+                final String maxColumnNames = 
StringUtils.join(maxValueColumnNameList, ", ");
+                if (StringUtils.isNotBlank(maxColumnNames)) {
+                    baseAttributes.put("generatetablefetch.maxColumnNames", 
maxColumnNames);
+                }
 
-                    final String maxColumnNames = 
StringUtils.join(maxValueColumnNameList, ", ");
-                    final String query = 
dbAdapter.getSelectStatement(tableName, columnNames, whereClause, 
maxColumnNames, limit, offset, columnForPartitioning);
-                    FlowFile sqlFlowFile = (fileToProcess == null) ? 
session.create() : session.create(fileToProcess);
-                    sqlFlowFile = session.write(sqlFlowFile, out -> 
out.write(query.getBytes()));
-                    Map<String,String> attributesToAdd = new HashMap<>();
+                baseAttributes.put(FRAGMENT_ID, fragmentIdentifier);
+                baseAttributes.put(FRAGMENT_COUNT, 
String.valueOf(numberOfFetches));
 
-                    attributesToAdd.put("generatetablefetch.tableName", 
tableName);
-                    if (columnNames != null) {
-                        attributesToAdd.put("generatetablefetch.columnNames", 
columnNames);
-                    }
-                    if (StringUtils.isNotBlank(whereClause)) {
-                        attributesToAdd.put("generatetablefetch.whereClause", 
whereClause);
-                    }
-                    if (StringUtils.isNotBlank(maxColumnNames)) {
-                        
attributesToAdd.put("generatetablefetch.maxColumnNames", maxColumnNames);
-                    }
-                    attributesToAdd.put("generatetablefetch.limit", 
String.valueOf(limit));
+                // If there are no SQL statements to be generated, still 
output an empty flow file if specified by the user
+                if (numberOfFetches == 0 && outputEmptyFlowFileOnZeroResults) {
+                    FlowFile emptyFlowFile = (fileToProcess == null) ? 
session.create() : session.create(fileToProcess);
+                    Map<String, String> attributesToAdd = new HashMap<>();
+
+                    whereClause = maxValueClauses.isEmpty() ? "1=1" : 
StringUtils.join(maxValueClauses, " AND ");
+                    attributesToAdd.put("generatetablefetch.whereClause", 
whereClause);
+
+                    attributesToAdd.put("generatetablefetch.limit", null);
                     if (partitionSize != 0) {
-                        attributesToAdd.put("generatetablefetch.offset", 
String.valueOf(offset));
+                        attributesToAdd.put("generatetablefetch.offset", null);
                     }
                     // Add fragment attributes
-                    attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier);
-                    attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(i));
-                    attributesToAdd.put(FRAGMENT_COUNT, 
String.valueOf(numberOfFetches));
+                    attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(0));
+
+                    attributesToAdd.putAll(baseAttributes);
+                    emptyFlowFile = session.putAllAttributes(emptyFlowFile, 
attributesToAdd);
+                    flowFilesToTransfer.add(emptyFlowFile);
+                } else {
+                    Long limit = partitionSize == 0 ? null : (long) 
partitionSize;
+                    for (long i = 0; i < numberOfFetches; i++) {
+                        // Add a right bounding for the partitioning column if 
necessary (only on last partition, meaning we don't need the limit)
+                        if ((i == numberOfFetches - 1) && 
useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != 
null)) {
+                            maxValueClauses.add(columnForPartitioning + " <= " 
+ maxValueForPartitioning);
+                            limit = null;
+                        }
+
+                        //Update WHERE list to include new right hand 
boundaries
+                        whereClause = maxValueClauses.isEmpty() ? "1=1" : 
StringUtils.join(maxValueClauses, " AND ");
+                        Long offset = partitionSize == 0 ? null : i * 
partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0);
 
-                    sqlFlowFile = session.putAllAttributes(sqlFlowFile, 
attributesToAdd);
-                    session.transfer(sqlFlowFile, REL_SUCCESS);
+                        final String query = 
dbAdapter.getSelectStatement(tableName, columnNames, whereClause, 
maxColumnNames, limit, offset, columnForPartitioning);
+                        FlowFile sqlFlowFile = (fileToProcess == null) ? 
session.create() : session.create(fileToProcess);
+                        sqlFlowFile = session.write(sqlFlowFile, out -> 
out.write(query.getBytes()));
+                        Map<String,String> attributesToAdd = new HashMap<>();
+
+                        attributesToAdd.put("generatetablefetch.whereClause", 
whereClause);
+                        attributesToAdd.put("generatetablefetch.limit", (limit 
== null) ? null : limit.toString());
+                        if (partitionSize != 0) {
+                            attributesToAdd.put("generatetablefetch.offset", 
String.valueOf(offset));
+                        }
+                        // Add fragment attributes
+                        attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(i));
+
+                        attributesToAdd.putAll(baseAttributes);
+                        sqlFlowFile = session.putAllAttributes(sqlFlowFile, 
attributesToAdd);
+                        flowFilesToTransfer.add(sqlFlowFile);
+                    }
                 }
 
+                session.transfer(flowFilesToTransfer, REL_SUCCESS);
+
                 if (fileToProcess != null) {
                     session.remove(fileToProcess);
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0207d081/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
index 6e0c397..8ccca2c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java
@@ -54,6 +54,7 @@ import static 
org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor
 import static 
org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.REL_SUCCESS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doNothing;
@@ -539,11 +540,55 @@ public class TestGenerateTableFetch {
 
         runner.run();
         runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 
1);
-        
runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0).assertContentEquals("SELECT
 * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID");
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE 
ID <= 2 ORDER BY ID");
+        flowFile.assertAttributeExists("generatetablefetch.limit");
+        flowFile.assertAttributeEquals("generatetablefetch.limit", null);
         runner.clearTransferState();
     }
 
     @Test
+    public void testFlowFileGeneratedOnZeroResults() throws SQLException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
bucket integer not null)");
+
+        runner.setProperty(GenerateTableFetch.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(GenerateTableFetch.COLUMN_NAMES, "ID,BUCKET");
+        runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
+        // Set partition size to 0 so we can see that the flow file gets all 
rows
+        runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "1");
+        
runner.setProperty(GenerateTableFetch.OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS, 
"false");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 
0);
+        runner.clearTransferState();
+
+        
runner.setProperty(GenerateTableFetch.OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS, 
"true");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 
1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0);
+        assertEquals("TEST_QUERY_DB_TABLE", 
flowFile.getAttribute("generatetablefetch.tableName"));
+        assertEquals("ID,BUCKET", 
flowFile.getAttribute("generatetablefetch.columnNames"));
+        assertEquals("1=1", 
flowFile.getAttribute("generatetablefetch.whereClause"));
+        assertEquals("ID", 
flowFile.getAttribute("generatetablefetch.maxColumnNames"));
+        assertNull(flowFile.getAttribute("generatetablefetch.limit"));
+        assertNull(flowFile.getAttribute("generatetablefetch.offset"));
+        assertEquals("0", flowFile.getAttribute("fragment.index"));
+        assertEquals("0", flowFile.getAttribute("fragment.count"));
+    }
+
+    @Test
     public void testMultiplePartitions() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
 
         // load test data to database

Reply via email to