This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 08ff54f5fb Add auto commit property to QueryDatabaseTable and 
QueryDatabaseTable processors to allow disabling auto commit so PostgreSQL 
Fetch Size will work
08ff54f5fb is described below

commit 08ff54f5fb712d01f372fd25a2d904d605e271ba
Author: Jim Steinebrey <jrsteineb...@gmail.com>
AuthorDate: Tue Mar 19 11:54:38 2024 -0400

    Add auto commit property to QueryDatabaseTable and QueryDatabaseTable 
processors to allow disabling auto commit so PostgreSQL Fetch Size will work
    
    NIFI-1931 Add proper default value for auto commit (false) to 
PostgreSQLDatabaseAdapter to allow FETCH_SIZE to be honored on reads.
    
    NIFI-1931 Added customValidate code to check the auto commit property 
setting against the db adapter's required auto commit setting and give 
validation error message if they do not match.
    
    NIFI-1931 Added automated test to check the Auto Commit customValidate 
error message.
    
    NIFI-1931 remove clearDefaultValue() because it is not needed since 
required = false a;ready defaults it to null.
    
    This closes #8534
    
    Signed-off-by: Matt Burgess <mattyb...@apache.org>
---
 .../standard/AbstractQueryDatabaseTable.java       |  71 ++++++++++++++-
 .../processors/standard/QueryDatabaseTable.java    |   1 +
 .../standard/QueryDatabaseTableRecord.java         |   1 +
 .../processors/standard/db/DatabaseAdapter.java    |  13 +++
 .../db/impl/PostgreSQLDatabaseAdapter.java         |  18 ++++
 .../processors/standard/QueryDatabaseTableIT.java  |  78 ++++++++++++++++
 .../standard/QueryDatabaseTableRecordIT.java       |  78 ++++++++++++++++
 .../standard/QueryDatabaseTableRecordTest.java     |  97 ++++++++++++++++++--
 .../standard/QueryDatabaseTableTest.java           | 101 +++++++++++++++++++--
 .../db/impl/TestPostgreSQLDatabaseAdapter.java     |  16 ++++
 10 files changed, 456 insertions(+), 18 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
index e5fc6745d6..7f7a870fb7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -90,16 +90,34 @@ public abstract class AbstractQueryDatabaseTable extends 
AbstractDatabaseFetchPr
             "TRANSACTION_SERIALIZABLE"
     );
 
+    private static final String FETCH_SIZE_NAME = "Fetch Size";
+    private static final String AUTO_COMMIT_NAME = "Set Auto Commit";
+
     public static final PropertyDescriptor FETCH_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Fetch Size")
+            .name(FETCH_SIZE_NAME)
             .description("The number of result rows to be fetched from the 
result set at a time. This is a hint to the database driver and may not be "
-                    + "honored and/or exact. If the value specified is zero, 
then the hint is ignored.")
+                    + "honored and/or exact. If the value specified is zero, 
then the hint is ignored. "
+                    + "If using PostgreSQL, then '" + AUTO_COMMIT_NAME + "' 
must be equal to 'false' to cause '" + FETCH_SIZE_NAME + "' to take effect.")
             .defaultValue("0")
             .required(true)
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
             .build();
 
+    public static final PropertyDescriptor AUTO_COMMIT = new 
PropertyDescriptor.Builder()
+            .name(AUTO_COMMIT_NAME)
+            .description("Allows enabling or disabling the auto commit 
functionality of the DB connection. Default value is 'No value set'. " +
+                    "'No value set' will leave the db connection's auto commit 
mode unchanged. " +
+                    "For some JDBC drivers such as PostgreSQL driver, it is 
required to disable the auto commit functionality " +
+                    "to get the '" + FETCH_SIZE_NAME + "' setting to take 
effect. " +
+                    "When auto commit is enabled, PostgreSQL driver ignores '" 
+ FETCH_SIZE_NAME + "' setting and loads all rows of the result set to memory 
at once. " +
+                    "This could lead for a large amount of memory usage when 
executing queries which fetch large data sets. " +
+                    "More Details of this behaviour in PostgreSQL driver can 
be found in https://jdbc.postgresql.org//documentation/head/query.html.";)
+            .allowableValues("true", "false")
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .required(false)
+            .build();
+
     public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new 
PropertyDescriptor.Builder()
             .name("qdbt-max-rows")
             .displayName("Max Rows Per Flow File")
@@ -196,6 +214,23 @@ public abstract class AbstractQueryDatabaseTable extends 
AbstractDatabaseFetchPr
                     .build());
         }
 
+        final Boolean propertyAutoCommit = 
validationContext.getProperty(AUTO_COMMIT).evaluateAttributeExpressions().asBoolean();
+        final Integer fetchSize = 
validationContext.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
+        final DatabaseAdapter dbAdapter = 
dbAdapters.get(validationContext.getProperty(DB_TYPE).getValue());
+        final Boolean adapterAutoCommit = dbAdapter == null
+                ? null
+                : dbAdapter.getAutoCommitForReads(fetchSize).orElse(null);
+        if (adapterAutoCommit != null && propertyAutoCommit != null
+            && propertyAutoCommit != adapterAutoCommit ) {
+            results.add(new ValidationResult.Builder().valid(false)
+                    .subject(AUTO_COMMIT.getDisplayName())
+                    .input(String.valueOf(propertyAutoCommit))
+                    .explanation(String.format("'%s' must be set to '%s' 
because '%s' %s requires it to be '%s'",
+                            AUTO_COMMIT.getDisplayName(), adapterAutoCommit,
+                            dbAdapter.getName(), DB_TYPE.getDisplayName(), 
adapterAutoCommit))
+                    .build());
+        }
+
         return results;
     }
 
@@ -304,7 +339,7 @@ public abstract class AbstractQueryDatabaseTable extends 
AbstractDatabaseFetchPr
                     }
                 }
             } catch (final Exception e) {
-                logger.error("Unable to execute SQL select query {} due to 
{}", new Object[]{selectMaxQuery, e});
+                logger.error("Unable to execute SQL select query {} due to 
{}", selectMaxQuery, e);
                 context.yield();
             }
         }
@@ -343,6 +378,24 @@ public abstract class AbstractQueryDatabaseTable extends 
AbstractDatabaseFetchPr
             if (logger.isDebugEnabled()) {
                 logger.debug("Executing query {}", new Object[] { selectQuery 
});
             }
+
+            final boolean originalAutoCommit = con.getAutoCommit();
+            final Boolean propertyAutoCommitValue = 
context.getProperty(AUTO_COMMIT).evaluateAttributeExpressions().asBoolean();
+            // If user sets AUTO_COMMIT property to non-null (i.e. true or 
false), then the property value overrides the dbAdapter's value
+            final Boolean setAutoCommitValue =
+                    dbAdapter == null || propertyAutoCommitValue != null
+                            ? propertyAutoCommitValue
+                            : 
dbAdapter.getAutoCommitForReads(fetchSize).orElse(null);
+            if (setAutoCommitValue != null && originalAutoCommit != 
setAutoCommitValue) {
+                try {
+                    con.setAutoCommit(setAutoCommitValue);
+                    logger.debug("Driver connection changed to 
setAutoCommit({})", setAutoCommitValue);
+                } catch (Exception ex) {
+                    logger.debug("Failed to setAutoCommit({}) due to {}: {}",
+                            setAutoCommitValue, ex.getClass().getName(), 
ex.getMessage());
+                }
+            }
+
             try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
                 int fragmentIndex=0;
                 // Max values will be updated in the state property map by the 
callback
@@ -441,12 +494,22 @@ public abstract class AbstractQueryDatabaseTable extends 
AbstractDatabaseFetchPr
                 }
             } catch (final SQLException e) {
                 throw e;
+            } finally {
+                if (con.getAutoCommit() != originalAutoCommit) {
+                    try {
+                        con.setAutoCommit(originalAutoCommit);
+                        logger.debug("Driver connection reset to original 
setAutoCommit({})", originalAutoCommit);
+                    } catch (Exception ex) {
+                        logger.debug("Failed to setAutoCommit({}) due to {}: 
{}",
+                                originalAutoCommit, ex.getClass().getName(), 
ex.getMessage());
+                    }
+                }
             }
 
             session.transfer(resultSetFlowFiles, REL_SUCCESS);
 
         } catch (final ProcessException | SQLException e) {
-            logger.error("Unable to execute SQL select query {} due to {}", 
new Object[]{selectQuery, e});
+            logger.error("Unable to execute SQL select query {} due to {}", 
selectQuery, e);
             if (!resultSetFlowFiles.isEmpty()) {
                 session.remove(resultSetFlowFiles);
             }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 24123729b8..51fbc41409 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -109,6 +109,7 @@ public class QueryDatabaseTable extends 
AbstractQueryDatabaseTable {
         pds.add(INITIAL_LOAD_STRATEGY);
         pds.add(QUERY_TIMEOUT);
         pds.add(FETCH_SIZE);
+        pds.add(AUTO_COMMIT);
         pds.add(MAX_ROWS_PER_FLOW_FILE);
         pds.add(OUTPUT_BATCH_SIZE);
         pds.add(MAX_FRAGMENTS);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
index 5838d7e46c..2004649976 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
@@ -208,6 +208,7 @@ public class QueryDatabaseTableRecord extends 
AbstractQueryDatabaseTable {
         pds.add(INITIAL_LOAD_STRATEGY);
         pds.add(QUERY_TIMEOUT);
         pds.add(FETCH_SIZE);
+        pds.add(AUTO_COMMIT);
         pds.add(MAX_ROWS_PER_FLOW_FILE);
         pds.add(OUTPUT_BATCH_SIZE);
         pds.add(MAX_FRAGMENTS);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
index ab661998ed..65b43ff8b7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -211,6 +212,18 @@ public interface DatabaseAdapter {
         return Collections.singletonList(createTableStatement.toString());
     }
 
+    /**
+     * Get the auto commit mode to use for reading from this database type.
+     * Most databases do not care which auto commit mode is used to read.
+     * For PostgreSQL it can make a difference.
+     * @param fetchSize The number of rows to retrieve at a time. Value of 0 
means retrieve all rows at once.
+     * @return Optional.empty() if auto commit mode does not matter and can be 
left as is.
+     *         Return true or false to indicate whether auto commit needs to 
be true or false for this database.
+     */
+    default Optional<Boolean> getAutoCommitForReads(Integer fetchSize) {
+        return Optional.empty();
+    }
+
     default String getSQLForDataType(int sqlType) {
         return JDBCType.valueOf(sqlType).getName();
     }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java
index 5e48818600..8ba7b64b22 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/PostgreSQLDatabaseAdapter.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static java.sql.Types.CHAR;
@@ -160,6 +161,23 @@ public class PostgreSQLDatabaseAdapter extends 
GenericDatabaseAdapter {
                 .toString());
     }
 
+    /**
+     * Get the auto commit mode to use for reading from this database type.
+     * For PostgreSQL databases, auto commit mode must be set to false to 
cause a fetchSize other than 0 to take effect.
+     * More Details of this behaviour in PostgreSQL driver can be found in 
https://jdbc.postgresql.org//documentation/head/query.html.";)
+     * For PostgreSQL, if autocommit is TRUE, then fetch size is treated as 0 
which loads all rows of the result set to memory at once.
+     * @param fetchSize The number of rows to retrieve at a time. Value of 0 
means retrieve all rows at once.
+     * @return Optional.empty() if auto commit mode does not matter and can be 
left as is.
+     *         Return true or false to indicate whether auto commit needs to 
be true or false for this database.
+     */
+    @Override
+    public Optional<Boolean> getAutoCommitForReads(Integer fetchSize) {
+        if (fetchSize != null && fetchSize != 0) {
+            return Optional.of(Boolean.FALSE);
+        }
+        return Optional.empty();
+    }
+
     @Override
     public String getSQLForDataType(int sqlType) {
         switch (sqlType) {
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableIT.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableIT.java
new file mode 100644
index 0000000000..602a87e945
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.dbcp.DBCPConnectionPool;
+import org.apache.nifi.dbcp.utils.DBCPProperties;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter;
+import org.apache.nifi.reporting.InitializationException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class QueryDatabaseTableIT extends QueryDatabaseTableTest {
+    private static PostgreSQLContainer<?> postgres;
+
+    @BeforeAll
+    public static void setupBeforeClass() {
+        postgres = new PostgreSQLContainer<>("postgres:9.6.12")
+                .withInitScript("PutDatabaseRecordIT/create-person-table.sql");
+        postgres.start();
+    }
+
+    @AfterAll
+    public static void cleanUpAfterClass() {
+        if (postgres != null) {
+            postgres.close();
+            postgres = null;
+        }
+    }
+
+    @Override
+    public DatabaseAdapter createDatabaseAdapter() {
+        return new PostgreSQLDatabaseAdapter();
+    }
+
+    @Override
+    public void createDbcpControllerService() throws InitializationException {
+        final DBCPConnectionPool connectionPool = new DBCPConnectionPool();
+        runner.addControllerService("dbcp", connectionPool);
+        runner.setProperty(connectionPool, DBCPProperties.DATABASE_URL, 
postgres.getJdbcUrl());
+        runner.setProperty(connectionPool, DBCPProperties.DB_USER, 
postgres.getUsername());
+        runner.setProperty(connectionPool, DBCPProperties.DB_PASSWORD, 
postgres.getPassword());
+        runner.setProperty(connectionPool, DBCPProperties.DB_DRIVERNAME, 
postgres.getDriverClassName());
+        runner.enableControllerService(connectionPool);
+    }
+
+    @Test
+    public void testAddedRowsAutoCommitTrue() throws SQLException, IOException 
{
+        // this test in the base class is not valid for PostgreSQL so check 
the validation error message.
+        final AssertionError assertionError = 
assertThrows(AssertionError.class, super::testAddedRowsAutoCommitTrue);
+        assertThat(assertionError.getMessage(), equalTo("Processor has 1 
validation failures:\n" +
+                "'Set Auto Commit' validated against 'true' is invalid because 
'Set Auto Commit' " +
+                "must be set to 'false' because 'PostgreSQL' Database Type 
requires it to be 'false'\n"));
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java
new file mode 100644
index 0000000000..4a98f0d48d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.dbcp.DBCPConnectionPool;
+import org.apache.nifi.dbcp.utils.DBCPProperties;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.db.impl.PostgreSQLDatabaseAdapter;
+import org.apache.nifi.reporting.InitializationException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class QueryDatabaseTableRecordIT extends QueryDatabaseTableRecordTest {
+    private static PostgreSQLContainer<?> postgres;
+
+    @BeforeAll
+    public static void setupBeforeClass() {
+        postgres = new PostgreSQLContainer<>("postgres:9.6.12")
+                .withInitScript("PutDatabaseRecordIT/create-person-table.sql");
+        postgres.start();
+    }
+
+    @AfterAll
+    public static void cleanUpAfterClass() {
+        if (postgres != null) {
+            postgres.close();
+            postgres = null;
+        }
+    }
+
+    @Override
+    public DatabaseAdapter createDatabaseAdapter() {
+        return new PostgreSQLDatabaseAdapter();
+    }
+
+    @Override
+    public void createDbcpControllerService() throws InitializationException {
+        final DBCPConnectionPool connectionPool = new DBCPConnectionPool();
+        runner.addControllerService("dbcp", connectionPool);
+        runner.setProperty(connectionPool, DBCPProperties.DATABASE_URL, 
postgres.getJdbcUrl());
+        runner.setProperty(connectionPool, DBCPProperties.DB_USER, 
postgres.getUsername());
+        runner.setProperty(connectionPool, DBCPProperties.DB_PASSWORD, 
postgres.getPassword());
+        runner.setProperty(connectionPool, DBCPProperties.DB_DRIVERNAME, 
postgres.getDriverClassName());
+        runner.enableControllerService(connectionPool);
+    }
+
+    @Test
+    public void testAddedRowsAutoCommitTrue() throws SQLException, IOException 
{
+        // this test in the base class is not valid for PostgreSQL so check 
the validation error message.
+        final AssertionError assertionError = 
assertThrows(AssertionError.class, super::testAddedRowsAutoCommitTrue);
+        assertThat(assertionError.getMessage(), equalTo("Processor has 1 
validation failures:\n" +
+                "'Set Auto Commit' validated against 'true' is invalid because 
'Set Auto Commit' " +
+                "must be set to 'false' because 'PostgreSQL' Database Type 
requires it to be 'false'\n"));
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
index 91b288df16..7e7de06992 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecordTest.java
@@ -71,7 +71,7 @@ public class QueryDatabaseTableRecordTest {
     private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
 
     MockQueryDatabaseTableRecord processor;
-    private TestRunner runner;
+    protected TestRunner runner;
     private final static String DB_LOCATION = "target/db_qdt";
     private DatabaseAdapter dbAdapter;
     private HashMap<String, DatabaseAdapter> origDbAdapters;
@@ -109,18 +109,25 @@ public class QueryDatabaseTableRecordTest {
         System.clearProperty("derby.stream.error.file");
     }
 
+    public DatabaseAdapter createDatabaseAdapter() {
+        return new GenericDatabaseAdapter();
+    }
 
-    @BeforeEach
-    public void setup() throws InitializationException, IOException {
+    public void createDbcpControllerService() throws InitializationException {
         final DBCPService dbcp = new DBCPServiceSimpleImpl();
         final Map<String, String> dbcpProperties = new HashMap<>();
+        runner.addControllerService("dbcp", dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+    }
+
+    @BeforeEach
+    public void setup() throws InitializationException, IOException {
         origDbAdapters = new HashMap<>(QueryDatabaseTableRecord.dbAdapters);
-        dbAdapter = new GenericDatabaseAdapter();
+        dbAdapter = createDatabaseAdapter();
         QueryDatabaseTableRecord.dbAdapters.put(dbAdapter.getName(), 
dbAdapter);
         processor = new MockQueryDatabaseTableRecord();
         runner = TestRunners.newTestRunner(processor);
-        runner.addControllerService("dbcp", dbcp, dbcpProperties);
-        runner.enableControllerService(dbcp);
+        createDbcpControllerService();
         runner.setProperty(QueryDatabaseTableRecord.DBCP_SERVICE, "dbcp");
         runner.setProperty(QueryDatabaseTableRecord.DB_TYPE, 
dbAdapter.getName());
         runner.getStateManager().clear(Scope.CLUSTER);
@@ -371,6 +378,82 @@ public class QueryDatabaseTableRecordTest {
         runner.clearTransferState();
     }
 
+    @Test
+    public void testAddedRowsAutoCommitTrue() throws SQLException, IOException 
{
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, 
"ID");
+        runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, 
"2");
+        runner.setProperty(QueryDatabaseTableRecord.FETCH_SIZE, "2");
+        runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "true");
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        assertEquals("TEST_QUERY_DB_TABLE", 
flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        flowFile.assertAttributeEquals("record.count", "2");
+
+        flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        flowFile.assertAttributeEquals("record.count", "1");
+    }
+
+    @Test
+    public void testAddedRowsAutoCommitFalse() throws SQLException, 
IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(QueryDatabaseTableRecord.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTableRecord.MAX_VALUE_COLUMN_NAMES, 
"ID");
+        runner.setProperty(QueryDatabaseTableRecord.MAX_ROWS_PER_FLOW_FILE, 
"2");
+        runner.setProperty(QueryDatabaseTableRecord.FETCH_SIZE, "2");
+        runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "false");
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(QueryDatabaseTableRecord.REL_SUCCESS, 2);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(0);
+        assertEquals("TEST_QUERY_DB_TABLE", 
flowFile.getAttribute(QueryDatabaseTableRecord.RESULT_TABLENAME));
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        flowFile.assertAttributeEquals("record.count", "2");
+
+        flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTableRecord.REL_SUCCESS).get(1);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        flowFile.assertAttributeEquals("record.count", "1");
+    }
+
     @Test
     public void testAddedRowsTwoTables() throws SQLException {
 
@@ -1415,7 +1498,7 @@ public class QueryDatabaseTableRecordTest {
     }
 
     @Stateful(scopes = Scope.CLUSTER, description = "Mock for 
QueryDatabaseTableRecord processor")
-    private static class MockQueryDatabaseTableRecord extends 
QueryDatabaseTableRecord {
+    protected static class MockQueryDatabaseTableRecord extends 
QueryDatabaseTableRecord {
         void putColumnType(String colName, Integer colType) {
             columnTypeMap.put(colName, colType);
         }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 17ce74bebb..8f360eeb50 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -74,7 +74,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class QueryDatabaseTableTest {
 
     MockQueryDatabaseTable processor;
-    private TestRunner runner;
+    protected TestRunner runner;
     private final static String DB_LOCATION = "target/db_qdt";
     private DatabaseAdapter dbAdapter;
     private HashMap<String, DatabaseAdapter> origDbAdapters;
@@ -113,18 +113,25 @@ public class QueryDatabaseTableTest {
         System.clearProperty("derby.stream.error.file");
     }
 
+    public DatabaseAdapter createDatabaseAdapter() {
+        return new GenericDatabaseAdapter();
+    }
 
-    @BeforeEach
-    public void setup() throws InitializationException, IOException {
+    public void createDbcpControllerService() throws InitializationException {
         final DBCPService dbcp = new DBCPServiceSimpleImpl();
         final Map<String, String> dbcpProperties = new HashMap<>();
+        runner.addControllerService("dbcp", dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+    }
+
+    @BeforeEach
+    public void setup() throws InitializationException, IOException {
         origDbAdapters = new HashMap<>(QueryDatabaseTable.dbAdapters);
-        dbAdapter = new GenericDatabaseAdapter();
+        dbAdapter = createDatabaseAdapter();
         QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), dbAdapter);
         processor = new MockQueryDatabaseTable();
         runner = TestRunners.newTestRunner(processor);
-        runner.addControllerService("dbcp", dbcp, dbcpProperties);
-        runner.enableControllerService(dbcp);
+        createDbcpControllerService();
         runner.setProperty(QueryDatabaseTable.DBCP_SERVICE, "dbcp");
         runner.setProperty(QueryDatabaseTable.DB_TYPE, dbAdapter.getName());
         runner.getStateManager().clear(Scope.CLUSTER);
@@ -373,6 +380,86 @@ public class QueryDatabaseTableTest {
         runner.clearTransferState();
     }
 
+    @Test
+    public void testAddedRowsAutoCommitTrue() throws SQLException, IOException 
{
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "2");
+        runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
+        runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "true");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
2);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+        assertEquals("TEST_QUERY_DB_TABLE", 
flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME));
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
+        assertEquals(2, getNumberOfRecordsFromStream(in));
+
+        flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        in = new ByteArrayInputStream(flowFile.toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+    }
+
+    @Test
+    public void testAddedRowsAutoCommitFalse() throws SQLException, 
IOException {
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_QUERY_DB_TABLE");
+        } catch (final SQLException sqle) {
+            // Ignore this error, probably a "table does not exist" since 
Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
+        }
+
+        stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, 
name varchar(100), scale float, created_on timestamp, bignum bigint default 
0)");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
+        stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, 
created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
+
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, 
"TEST_QUERY_DB_TABLE");
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
+        runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "2");
+        runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");
+        runner.setProperty(QueryDatabaseTable.AUTO_COMMIT, "false");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
2);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+        assertEquals("TEST_QUERY_DB_TABLE", 
flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME));
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
+        assertEquals(2, getNumberOfRecordsFromStream(in));
+
+        flowFile = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(1);
+        assertEquals(flowFile.getAttribute("maxvalue.id"), "2");
+        in = new ByteArrayInputStream(flowFile.toByteArray());
+        assertEquals(1, getNumberOfRecordsFromStream(in));
+    }
+
     @Test
     public void testAddedRowsTwoTables() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
 
@@ -1461,7 +1548,7 @@ public class QueryDatabaseTableTest {
     }
 
     @Stateful(scopes = Scope.CLUSTER, description = "Mock for 
QueryDatabaseTable processor")
-    private static class MockQueryDatabaseTable extends QueryDatabaseTable {
+    protected static class MockQueryDatabaseTable extends QueryDatabaseTable {
         void putColumnType(String colName, Integer colType) {
             columnTypeMap.put(colName, colType);
         }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPostgreSQLDatabaseAdapter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPostgreSQLDatabaseAdapter.java
index ea10621867..6d866ef90f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPostgreSQLDatabaseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/db/impl/TestPostgreSQLDatabaseAdapter.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -41,6 +42,21 @@ public class TestPostgreSQLDatabaseAdapter {
         assertTrue(testSubject.supportsUpsert(), 
testSubject.getClass().getSimpleName() + " should support upsert");
     }
 
+    @Test
+    public void getAutoCommitForReadsFetchSizeNull() {
+        assertEquals(Optional.empty(), 
testSubject.getAutoCommitForReads(null));
+    }
+
+    @Test
+    public void getAutoCommitForReadsFetchSizeZero() {
+        assertEquals(Optional.empty(), testSubject.getAutoCommitForReads(0));
+    }
+
+    @Test
+    public void getAutoCommitForReadsFetchSizeNonZero() {
+        assertEquals(Optional.of(Boolean.FALSE), 
testSubject.getAutoCommitForReads(1));
+    }
+
     @Test
     public void testGetUpsertStatementWithNullTableName() {
         testGetUpsertStatement(null, Arrays.asList("notEmpty"), 
Arrays.asList("notEmpty"), new IllegalArgumentException("Table name cannot be 
null or blank"));


Reply via email to