Repository: nifi
Updated Branches:
  refs/heads/master be0949570 -> 75906226a


NIFI-5780 Add pre and post statements to ExecuteSQL and ExecuteSQLRecord

Signed-off-by: Peter Wicks <patric...@gmail.com>

This closes #3156.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/75906226
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/75906226
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/75906226

Branch: refs/heads/master
Commit: 75906226a6f265acc7d87414ff477a5b0646b6d8
Parents: be09495
Author: yjhyjhyjh0 <yjhyjhy...@gmail.com>
Authored: Thu Nov 8 00:25:50 2018 +0800
Committer: Peter Wicks <patric...@gmail.com>
Committed: Thu Nov 15 13:18:31 2018 -0700

----------------------------------------------------------------------
 .../nifi/processors/hive/SelectHiveQL.java      |   4 +-
 .../processors/standard/AbstractExecuteSQL.java |  79 +++++++++-
 .../nifi/processors/standard/ExecuteSQL.java    |   2 +
 .../processors/standard/ExecuteSQLRecord.java   |   2 +
 .../processors/standard/TestExecuteSQL.java     | 146 +++++++++++++++++++
 .../standard/TestExecuteSQLRecord.java          | 132 +++++++++++++++++
 6 files changed, 362 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
index 5342c09..3b8576b 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
@@ -109,7 +109,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
     public static final PropertyDescriptor HIVEQL_PRE_QUERY = new 
PropertyDescriptor.Builder()
             .name("hive-pre-query")
             .displayName("HiveQL Pre-Query")
-            .description("HiveQL pre-query to execute. Semicolon-delimited 
list of queries. "
+            .description("A semicolon-delimited list of queries executed 
before the main SQL query is executed. "
                     + "Example: 'set tez.queue.name=queue1; set 
hive.exec.orc.split.strategy=ETL; set 
hive.exec.reducers.bytes.per.reducer=1073741824'. "
                     + "Note, the results/outputs of these queries will be 
suppressed if successfully executed.")
             .required(false)
@@ -129,7 +129,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
     public static final PropertyDescriptor HIVEQL_POST_QUERY = new 
PropertyDescriptor.Builder()
             .name("hive-post-query")
             .displayName("HiveQL Post-Query")
-            .description("HiveQL post-query to execute. Semicolon-delimited 
list of queries. "
+            .description("A semicolon-delimited list of queries executed after 
the main SQL query is executed. "
                     + "Note, the results/outputs of these queries will be 
suppressed if successfully executed.")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)

http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index bf46549..d1fabef 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.dbcp.DBCPService;
@@ -44,6 +45,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -82,6 +84,17 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
             .identifiesControllerService(DBCPService.class)
             .build();
 
+    public static final PropertyDescriptor SQL_PRE_QUERY = new 
PropertyDescriptor.Builder()
+            .name("sql-pre-query")
+            .displayName("SQL Pre-Query")
+            .description("A semicolon-delimited list of queries executed 
before the main SQL query is executed. " +
+                    "For example, set session properties before main query. " +
+                    "Results/outputs from these queries will be suppressed if 
there are no errors.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
     public static final PropertyDescriptor SQL_SELECT_QUERY = new 
PropertyDescriptor.Builder()
             .name("SQL select query")
             .description("The SQL select query to execute. The query can be 
empty, a constant value, or built from attributes "
@@ -94,6 +107,17 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
+    public static final PropertyDescriptor SQL_POST_QUERY = new 
PropertyDescriptor.Builder()
+            .name("sql-post-query")
+            .displayName("SQL Post-Query")
+            .description("A semicolon-delimited list of queries executed after 
the main SQL query is executed. " +
+                    "Example like setting session properties after main query. 
" +
+                    "Results/outputs from these queries will be suppressed if 
there are no errors.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
     public static final PropertyDescriptor QUERY_TIMEOUT = new 
PropertyDescriptor.Builder()
             .name("Max Wait Time")
             .description("The maximum amount of time allowed for a running SQL 
select query "
@@ -177,10 +201,12 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
         final Integer maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
         final Integer outputBatchSizeField = 
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
         final int outputBatchSize = outputBatchSizeField == null ? 0 : 
outputBatchSizeField;
+        List<String> preQueries = 
getQueries(context.getProperty(SQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
+        List<String> postQueries = 
getQueries(context.getProperty(SQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
 
         SqlWriter sqlWriter = configureSqlWriter(session, context, 
fileToProcess);
 
-        final String selectQuery;
+        String selectQuery;
         if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
             selectQuery = 
context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
         } else {
@@ -196,6 +222,14 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
              final PreparedStatement st = con.prepareStatement(selectQuery)) {
             st.setQueryTimeout(queryTimeout); // timeout in seconds
 
+            // Execute pre-query, throw exception and cleanup Flow Files if 
fail
+            Pair<String,SQLException> failure = executeConfigStatements(con, 
preQueries);
+            if (failure != null) {
+                // In case of failure, assigning config query to "selectQuery" 
to follow current error handling
+                selectQuery = failure.getLeft();
+                throw failure.getRight();
+            }
+
             if (fileToProcess != null) {
                 JdbcCommon.setParameters(st, fileToProcess.getAttributes());
             }
@@ -317,6 +351,14 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
                 }
             }
 
+            // Execute post-query, throw exception and cleanup Flow Files if 
fail
+            failure = executeConfigStatements(con, postQueries);
+            if (failure != null) {
+                selectQuery = failure.getLeft();
+                resultSetFlowFiles.forEach(ff -> session.remove(ff));
+                throw failure.getRight();
+            }
+
             // Transfer any remaining files to SUCCESS
             session.transfer(resultSetFlowFiles, REL_SUCCESS);
             resultSetFlowFiles.clear();
@@ -365,5 +407,40 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
         }
     }
 
+    /*
+     * Executes given queries using pre-defined connection.
+     * Returns null on success, or a query string if failed.
+     */
+    protected Pair<String,SQLException> executeConfigStatements(final 
Connection con, final List<String> configQueries){
+        if (configQueries == null || configQueries.isEmpty()) {
+            return null;
+        }
+
+        for (String confSQL : configQueries) {
+            try(final Statement st = con.createStatement()){
+                st.execute(confSQL);
+            } catch (SQLException e) {
+                return Pair.of(confSQL, e);
+            }
+        }
+        return null;
+    }
+
+    /*
+     * Extract list of queries from config property
+     */
+    protected List<String> getQueries(final String value) {
+        if (value == null || value.length() == 0 || value.trim().length() == 
0) {
+            return null;
+        }
+        final List<String> queries = new LinkedList<>();
+        for (String query : value.split(";")) {
+            if (query.trim().length() > 0) {
+                queries.add(query.trim());
+            }
+        }
+        return queries;
+    }
+
     protected abstract SqlWriter configureSqlWriter(ProcessSession session, 
ProcessContext context, FlowFile fileToProcess);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index cc6d508..9c61793 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -114,7 +114,9 @@ public class ExecuteSQL extends AbstractExecuteSQL {
 
         final List<PropertyDescriptor> pds = new ArrayList<>();
         pds.add(DBCP_SERVICE);
+        pds.add(SQL_PRE_QUERY);
         pds.add(SQL_SELECT_QUERY);
+        pds.add(SQL_POST_QUERY);
         pds.add(QUERY_TIMEOUT);
         pds.add(NORMALIZE_NAMES_FOR_AVRO);
         pds.add(USE_AVRO_LOGICAL_TYPES);

http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
index 31d0ec8..5a84458 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
@@ -121,7 +121,9 @@ public class ExecuteSQLRecord extends AbstractExecuteSQL {
 
         final List<PropertyDescriptor> pds = new ArrayList<>();
         pds.add(DBCP_SERVICE);
+        pds.add(SQL_PRE_QUERY);
         pds.add(SQL_SELECT_QUERY);
+        pds.add(SQL_POST_QUERY);
         pds.add(QUERY_TIMEOUT);
         pds.add(RECORD_WRITER_FACTORY);
         pds.add(NORMALIZE_NAMES);

http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 63de91a..199bd94 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -544,7 +544,153 @@ public class TestExecuteSQL {
         }
     }
 
+    @Test
+    public void testPreQuery() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+        stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.enqueue("test".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1");
+
+        final InputStream in = new 
ByteArrayInputStream(firstFlowFile.toByteArray());
+        final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+        try (DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(in, datumReader)) {
+            GenericRecord record = null;
+            long recordsFromStream = 0;
+            while (dataFileReader.hasNext()) {
+                // Reuse record object by passing it to next(). This saves us 
from
+                // allocating and garbage collecting many objects for files 
with
+                // many items.
+                record = dataFileReader.next(record);
+                recordsFromStream += 1;
+            }
+
+            assertEquals(1, recordsFromStream);
+        }
+    }
+
+    @Test
+    public void testPostQuery() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+        stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
+        runner.enqueue("test".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1");
+
+        final InputStream in = new 
ByteArrayInputStream(firstFlowFile.toByteArray());
+        final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+        try (DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<>(in, datumReader)) {
+            GenericRecord record = null;
+            long recordsFromStream = 0;
+            while (dataFileReader.hasNext()) {
+                // Reuse record object by passing it to next(). This saves us 
from
+                // allocating and garbage collecting many objects for files 
with
+                // many items.
+                record = dataFileReader.next(record);
+                recordsFromStream += 1;
+            }
+
+            assertEquals(1, recordsFromStream);
+        }
+    }
+
+    @Test
+    public void testPreQueryFail() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        runner.setIncomingConnection(true);
+        // Simulate failure by not provide parameter
+        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.enqueue("test".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testPostQueryFail() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
 
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        // Simulate failure by not provide parameter
+        runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
+        runner.enqueue("test".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0);
+        firstFlowFile.assertContentEquals("test");
+    }
 
     /**
      * Simple implementation only for ExecuteSQL processor testing.

http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 04c4c00..03cdbfc 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -350,6 +350,138 @@ public class TestExecuteSQLRecord {
         assertEquals(durationTime, fetchTime + executionTime);
     }
 
+    @Test
+    public void testPreQuery() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+        stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.enqueue("test".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1");
+    }
+
+    @Test
+    public void testPostQuery() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+        stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.enqueue("test".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1");
+    }
+
+    @Test
+    public void testPreQueryFail() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        runner.setIncomingConnection(true);
+        // Simulate failure by not provide parameter
+        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.enqueue("test".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testPostQueryFail() throws Exception {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        // Simulate failure by not provide parameter
+        runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+        runner.enqueue("test".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0);
+        firstFlowFile.assertContentEquals("test");
+    }
+
 
     /**
      * Simple implementation only for ExecuteSQL processor testing.

Reply via email to