Repository: nifi Updated Branches: refs/heads/master fda15d916 -> 938c7cccb
NIFI-2749 Signed-off-by: Matt Burgess <mattyb...@apache.org> This closes #997 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/938c7ccc Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/938c7ccc Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/938c7ccc Branch: refs/heads/master Commit: 938c7cccb8b251d4a1390cf0078b14f53bc94a57 Parents: fda15d9 Author: Peter Wicks <pwi...@micron.com> Authored: Thu Sep 8 21:37:21 2016 -0600 Committer: Matt Burgess <mattyb...@apache.org> Committed: Wed Sep 21 11:16:57 2016 -0400 ---------------------------------------------------------------------- .../processors/standard/QueryDatabaseTable.java | 17 +++++++++-------- .../standard/QueryDatabaseTableTest.java | 13 +++++++++++-- 2 files changed, 20 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/938c7ccc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 31bec27..278cc30 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -270,11 +270,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex)); } - // Add maximum values as attributes - for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) { - fileToProcess = session.putAttribute(fileToProcess, "maxvalue." + entry.getKey(), entry.getValue()); - } - logger.info("{} contains {} Avro records; transferring to 'success'", new Object[]{fileToProcess, nrOfRows.get()}); @@ -290,13 +285,19 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { fragmentIndex++; } - //set count on all FlowFiles - if(maxRowsPerFlowFile > 0) { - for (int i = 0; i < resultSetFlowFiles.size(); i++) { + for (int i = 0; i < resultSetFlowFiles.size(); i++) { + // Add maximum values as attributes + for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) { + resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + entry.getKey(), entry.getValue())); + } + + //set count on all FlowFiles + if(maxRowsPerFlowFile > 0) { resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex))); } } + } catch (final SQLException e) { throw e; } http://git-wip-us.apache.org/repos/asf/nifi/blob/938c7ccc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 974a835..f3904ef 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -184,15 +184,21 @@ public class QueryDatabaseTableTest { runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); runner.setIncomingConnection(false); runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"2"); runner.run(); - runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2); MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); InputStream in = new ByteArrayInputStream(flowFile.toByteArray()); runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2"); - assertEquals(3, getNumberOfRecordsFromStream(in)); + assertEquals(2, getNumberOfRecordsFromStream(in)); + + flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1); + assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); + in = new ByteArrayInputStream(flowFile.toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); runner.clearTransferState(); // Run again, this time no flowfiles/rows should be transferred @@ -200,6 +206,9 @@ public class QueryDatabaseTableTest { runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); runner.clearTransferState(); + //Remove Max Rows Per Flow File + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE,"0"); + // Add a new row with a higher ID and run, one flowfile with one new row should be transferred stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')"); runner.run();