nifi git commit: NIFI-5823: Fixes typo in min idle connections property name
Repository: nifi Updated Branches: refs/heads/master 0207d0813 -> 102a5288e NIFI-5823: Fixes typo in min idle connections property name So many people missed it :-( This closes #3172. Signed-off-by: Koji Kawamura Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/102a5288 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/102a5288 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/102a5288 Branch: refs/heads/master Commit: 102a5288efb2a22cd54815dd7331dfc5826aee91 Parents: 0207d08 Author: Colin Dean Authored: Thu Nov 15 13:46:41 2018 -0500 Committer: Koji Kawamura Committed: Fri Nov 16 11:16:55 2018 +0900 -- .../src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/102a5288/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java -- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java index 5228496..d6f83f2 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java @@ -194,7 +194,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder() .displayName("Minimum Idle Connections") -.name("dbcp-mim-idle-conns") +.name("dbcp-min-idle-conns") .description("The minimum number of connections that can remain idle in the pool, without extra ones being " + "created, or zero to create none.") .defaultValue(DEFAULT_MIN_IDLE) @@ -438,4 +438,4 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC BasicDataSource getDataSource() { return dataSource; } -} \ No newline at end of file +}
nifi-minifi-cpp git commit: MINIFICPP-677: Change behavior of async callback
Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 1257e5291 -> d7885d6bd MINIFICPP-677: Change behavior of async callback This closes #441. Signed-off-by: Aldrin Piri Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/d7885d6b Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/d7885d6b Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/d7885d6b Branch: refs/heads/master Commit: d7885d6bd8dea00d584fe9fcbc3eb21a12d43205 Parents: 1257e52 Author: Marc Parisi Authored: Thu Nov 15 15:30:46 2018 -0500 Committer: Aldrin Piri Committed: Thu Nov 15 20:00:30 2018 -0500 -- extensions/http-curl/client/HTTPClient.cpp | 4 ++ extensions/http-curl/client/HTTPStream.cpp | 15 +++--- extensions/http-curl/client/HTTPStream.h| 3 ++ .../http-curl/tests/HTTPIntegrationBase.h | 15 -- .../http-curl/tests/HTTPSiteToSiteTests.cpp | 2 +- libminifi/include/utils/ByteArrayCallback.h | 2 +- libminifi/test/integration/IntegrationBase.h| 52 +--- 7 files changed, 50 insertions(+), 43 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/client/HTTPClient.cpp -- diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp index 7940c6e..d607664 100644 --- a/extensions/http-curl/client/HTTPClient.cpp +++ b/extensions/http-curl/client/HTTPClient.cpp @@ -99,6 +99,10 @@ HTTPClient::~HTTPClient() { curl_easy_cleanup(http_session_); http_session_ = nullptr; } + // forceClose ended up not being the issue in MINIFICPP-667, but leaving here + // out of good hygiene. + forceClose(); + read_callback_.close(); logger_->log_trace("Closing HTTPClient for %s", url_); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/client/HTTPStream.cpp -- diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp index 608870b..8735b61 100644 --- a/extensions/http-curl/client/HTTPStream.cpp +++ b/extensions/http-curl/client/HTTPStream.cpp @@ -37,9 +37,9 @@ HttpStream::HttpStream(std::shared_ptr client) written(0), // given the nature of the stream we don't want to slow libCURL, we will produce // a warning instead allowing us to adjust it server side or through the local configuration. - http_read_callback_(66560,true), + http_read_callback_(66560, true), started_(false), - logger_(logging::LoggerFactory::getLogger()){ + logger_(logging::LoggerFactory::getLogger()) { // submit early on } @@ -54,7 +54,7 @@ void HttpStream::seek(uint64_t offset) { } int HttpStream::writeData(std::vector , int buflen) { - if ((int)buf.capacity() < buflen) { + if ((int) buf.capacity() < buflen) { return -1; } return writeData(reinterpret_cast([0]), buflen); @@ -70,11 +70,11 @@ int HttpStream::writeData(uint8_t *value, int size) { callback_.ptr = _callback_; callback_.pos = 0; http_client_->setUploadCallback(_); -http_client_future_ = std::async(submit_client, http_client_); +http_client_future_ = std::async(std::launch::async, submit_client, http_client_); started_ = true; } } -http_callback_.process(value,size); +http_callback_.process(value, size); return size; } else { return -1; @@ -90,7 +90,7 @@ inline std::vector HttpStream::readBuffer(const T& t) { } int HttpStream::readData(std::vector , int buflen) { - if ((int)buf.capacity() < buflen) { + if ((int) buf.capacity() < buflen) { buf.resize(buflen); } int ret = readData(reinterpret_cast([0]), buflen); @@ -109,11 +109,10 @@ int HttpStream::readData(uint8_t *buf, int buflen) { read_callback_.ptr = _read_callback_; read_callback_.pos = 0; http_client_->setReadCallback(_callback_); -http_client_future_ = std::async(submit_read_client, http_client_, _read_callback_); +http_client_future_ = std::async(std::launch::async, submit_read_client, http_client_, _read_callback_); started_ = true; } } - return http_read_callback_.readFully((char*) buf, buflen); } else { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/client/HTTPStream.h -- diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h index d3e5bca..3829e94 100644 --- a/extensions/http-curl/client/HTTPStream.h +++
nifi git commit: NIFI-5604: Added property to allow empty FlowFile when no SQL generated by GenerateTableFetch
Repository: nifi Updated Branches: refs/heads/master 75906226a -> 0207d0813 NIFI-5604: Added property to allow empty FlowFile when no SQL generated by GenerateTableFetch co-authored by: Peter Wicks Signed-off-by: Peter Wicks This closes #3075. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0207d081 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0207d081 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0207d081 Branch: refs/heads/master Commit: 0207d0813ef164ee7227ded61fb10960a4842e2d Parents: 7590622 Author: Matthew Burgess Authored: Thu Nov 1 22:09:13 2018 -0400 Committer: Peter Wicks Committed: Thu Nov 15 13:37:43 2018 -0700 -- .../processors/standard/GenerateTableFetch.java | 104 +-- .../standard/TestGenerateTableFetch.java| 47 - 2 files changed, 118 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/0207d081/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java -- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index a547393..b4ba9fe 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -142,6 +142,18 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); +static final PropertyDescriptor OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS = new PropertyDescriptor.Builder() +.name("gen-table-output-flowfile-on-zero-results") +.displayName("Output Empty FlowFile on Zero Results") +.description("Depending on the specified properties, an execution of this processor may not result in any SQL statements generated. When this property " ++ "is true, an empty flow file will be generated (having the parent of the incoming flow file if present) and transferred to the 'success' relationship. " ++ "When this property is false, no output flow files will be generated.") +.required(true) +.allowableValues("true", "false") +.defaultValue("false") +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. " @@ -164,6 +176,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { pds.add(PARTITION_SIZE); pds.add(COLUMN_FOR_VALUE_PARTITIONING); pds.add(WHERE_CLAUSE); +pds.add(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS); propDescriptors = Collections.unmodifiableList(pds); } @@ -247,6 +260,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { final String columnForPartitioning = context.getProperty(COLUMN_FOR_VALUE_PARTITIONING).evaluateAttributeExpressions(fileToProcess).getValue(); final boolean useColumnValsForPaging = !StringUtils.isEmpty(columnForPartitioning); final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue(); +final boolean outputEmptyFlowFileOnZeroResults = context.getProperty(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS).asBoolean(); final StateManager stateManager = context.getStateManager(); final StateMap stateMap; @@ -435,49 +449,75 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { } // Generate SQL statements to read "pages" of data -Long limit = partitionSize == 0 ? null : (long) partitionSize; final String fragmentIdentifier = UUID.randomUUID().toString(); -for (long i = 0; i < numberOfFetches; i++) { -// Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit) -if ((i == numberOfFetches - 1) &&
nifi git commit: NIFI-5780 Add pre and post statements to ExecuteSQL and ExecuteSQLRecord
Repository: nifi Updated Branches: refs/heads/master be0949570 -> 75906226a NIFI-5780 Add pre and post statements to ExecuteSQL and ExecuteSQLRecord Signed-off-by: Peter Wicks This closes #3156. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/75906226 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/75906226 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/75906226 Branch: refs/heads/master Commit: 75906226a6f265acc7d87414ff477a5b0646b6d8 Parents: be09495 Author: yjhyjhyjh0 Authored: Thu Nov 8 00:25:50 2018 +0800 Committer: Peter Wicks Committed: Thu Nov 15 13:18:31 2018 -0700 -- .../nifi/processors/hive/SelectHiveQL.java | 4 +- .../processors/standard/AbstractExecuteSQL.java | 79 +- .../nifi/processors/standard/ExecuteSQL.java| 2 + .../processors/standard/ExecuteSQLRecord.java | 2 + .../processors/standard/TestExecuteSQL.java | 146 +++ .../standard/TestExecuteSQLRecord.java | 132 + 6 files changed, 362 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java -- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index 5342c09..3b8576b 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -109,7 +109,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { public static final PropertyDescriptor HIVEQL_PRE_QUERY = new PropertyDescriptor.Builder() .name("hive-pre-query") .displayName("HiveQL Pre-Query") -.description("HiveQL pre-query to execute. Semicolon-delimited list of queries. " +.description("A semicolon-delimited list of queries executed before the main SQL query is executed. " + "Example: 'set tez.queue.name=queue1; set hive.exec.orc.split.strategy=ETL; set hive.exec.reducers.bytes.per.reducer=1073741824'. " + "Note, the results/outputs of these queries will be suppressed if successfully executed.") .required(false) @@ -129,7 +129,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { public static final PropertyDescriptor HIVEQL_POST_QUERY = new PropertyDescriptor.Builder() .name("hive-post-query") .displayName("HiveQL Post-Query") -.description("HiveQL post-query to execute. Semicolon-delimited list of queries. " +.description("A semicolon-delimited list of queries executed after the main SQL query is executed. " + "Note, the results/outputs of these queries will be suppressed if successfully executed.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java -- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java index bf46549..d1fabef 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.dbcp.DBCPService; @@ -44,6 +45,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -82,6 +84,17 @@ public abstract class AbstractExecuteSQL extends
nifi git commit: NIFI-5824: Added unit test to FlowController to ensure that the ProcessScheduler that it creates is properly initialized. Also updated the properties file used by TestFlowController t
Repository: nifi Updated Branches: refs/heads/master 76b0065a6 -> be0949570 NIFI-5824: Added unit test to FlowController to ensure that the ProcessScheduler that it creates is properly initialized. Also updated the properties file used by TestFlowController to use a VolatileContentRepository instead of FileSystemRepository, and fixed EventDrivenWorkerQueue to return if calls to poll() are interrupted (via Thread.interrupt) - making these minor fixes resulted in the unit test TestFlowController running in 2 seconds instead of 30+ seconds on my machine This closes #3173. Signed-off-by: Bryan Bende Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/be094957 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/be094957 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/be094957 Branch: refs/heads/master Commit: be0949570a66f672e128ac97c936df546c7d2521 Parents: 76b0065 Author: Mark Payne Authored: Thu Nov 15 14:26:36 2018 -0500 Committer: Bryan Bende Committed: Thu Nov 15 15:01:56 2018 -0500 -- .../nifi/controller/EventDrivenWorkerQueue.java | 12 ++- .../scheduling/StandardProcessScheduler.java| 9 +++-- .../nifi/controller/TestFlowController.java | 21 .../flowcontrollertest.nifi.properties | 3 +++ 4 files changed, 38 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/be094957/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java -- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java index f36a459..25e8a86 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java @@ -16,6 +16,11 @@ */ package org.apache.nifi.controller; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.util.Connectables; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -25,11 +30,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.util.Connectables; - public class EventDrivenWorkerQueue implements WorkerQueue { private final Object workMonitor = new Object(); @@ -69,6 +69,8 @@ public class EventDrivenWorkerQueue implements WorkerQueue { try { workMonitor.wait(timeLeft); } catch (final InterruptedException ignored) { +Thread.currentThread().interrupt(); +return null; } } else { // Decrement the amount of work there is to do for this worker. http://git-wip-us.apache.org/repos/asf/nifi/blob/be094957/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java -- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 2902f6a..2ff3307 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -35,6 +35,7 @@ import org.apache.nifi.controller.SchedulingAgentCallback; import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.service.ControllerServiceNode; +import
nifi git commit: NIFI-5822: Ensure that we don't call FlowController.getControllerServiceProvider() before the ControllerServiceProvider has been initialized
Repository: nifi Updated Branches: refs/heads/master d319a3ef2 -> 76b0065a6 NIFI-5822: Ensure that we don't call FlowController.getControllerServiceProvider() before the ControllerServiceProvider has been initialized This closes #3171. Signed-off-by: Bryan Bende Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/76b0065a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/76b0065a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/76b0065a Branch: refs/heads/master Commit: 76b0065a67c0e3270d44d85e1933c41bfaba8841 Parents: d319a3e Author: Mark Payne Authored: Thu Nov 15 12:33:03 2018 -0500 Committer: Bryan Bende Committed: Thu Nov 15 13:25:33 2018 -0500 -- .../nifi/controller/scheduling/StandardProcessScheduler.java | 7 ++- 1 file changed, 2 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/76b0065a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java -- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 6313097..2902f6a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -35,7 +35,6 @@ import org.apache.nifi.controller.SchedulingAgentCallback; import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.service.ControllerServiceNode; -import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.logging.ComponentLog; @@ -72,7 +71,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { private static final Logger LOG = LoggerFactory.getLogger(StandardProcessScheduler.class); -private final ControllerServiceProvider controllerServiceProvider; private final FlowController flowController; private final long administrativeYieldMillis; private final String administrativeYieldDuration; @@ -92,7 +90,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { public StandardProcessScheduler(final FlowEngine componentLifecycleThreadPool, final FlowController flowController, final StringEncryptor encryptor, final StateManagerProvider stateManagerProvider, final NiFiProperties nifiProperties) { this.componentLifeCycleThreadPool = componentLifecycleThreadPool; -this.controllerServiceProvider = flowController.getControllerServiceProvider(); this.flowController = flowController; this.encryptor = encryptor; this.stateManagerProvider = stateManagerProvider; @@ -296,7 +293,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { public synchronized CompletableFuture startProcessor(final ProcessorNode procNode, final boolean failIfStopping) { final LifecycleState lifecycleState = getLifecycleState(requireNonNull(procNode), true); -final StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider, +final StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(), this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated); final CompletableFuture future = new CompletableFuture<>(); @@ -336,7 +333,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { public synchronized CompletableFuture stopProcessor(final ProcessorNode procNode) { final LifecycleState lifecycleState = getLifecycleState(procNode, false); -StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider, +StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(), this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated); LOG.info("Stopping {}", procNode);
nifi git commit: NIFI-5788: Introduce batch size limit in PutDatabaseRecord processor
Repository: nifi Updated Branches: refs/heads/master 13011ac6d -> d319a3ef2 NIFI-5788: Introduce batch size limit in PutDatabaseRecord processor NIFI-5788: Introduce batch size limit in PutDatabaseRecord processor Renamed 'batch size' to 'Maximum Batch Size'. Changed default value of max_batch_size to zero (INFINITE) Fixed parameter validation. Added unit tests Signed-off-by: Matthew Burgess This closes #3128 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d319a3ef Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d319a3ef Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d319a3ef Branch: refs/heads/master Commit: d319a3ef2f14317f29a1be5a189bc34f8b3fdbd6 Parents: 13011ac Author: vadimar Authored: Mon Nov 5 13:15:12 2018 +0200 Committer: Matthew Burgess Committed: Thu Nov 15 10:31:34 2018 -0500 -- .../processors/standard/PutDatabaseRecord.java | 29 +- .../standard/TestPutDatabaseRecord.groovy | 103 +++ 2 files changed, 130 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/d319a3ef/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java -- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 2f2d901..d79cf3c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -275,6 +275,17 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { .required(true) .build(); +static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() +.name("put-db-record-max-batch-size") +.displayName("Maximum Batch Size") +.description("Specifies maximum batch size for INSERT and UPDATE statements. This parameter has no effect for other statements specified in 'Statement Type'." ++ " Zero means the batch size is not limited.") +.defaultValue("0") +.required(false) +.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.build(); + protected static List propDescriptors; private Cache schemaCache; @@ -303,6 +314,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { pds.add(QUERY_TIMEOUT); pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE); pds.add(TABLE_SCHEMA_CACHE_SIZE); +pds.add(MAX_BATCH_SIZE); propDescriptors = Collections.unmodifiableList(pds); } @@ -641,6 +653,10 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { Record currentRecord; List fieldIndexes = sqlHolder.getFieldIndexes(); +final Integer maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger(); +int currentBatchSize = 0; +int batchIndex = 0; + while ((currentRecord = recordParser.nextRecord()) != null) { Object[] values = currentRecord.getValues(); if (values != null) { @@ -667,11 +683,20 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { } } ps.addBatch(); +if (++currentBatchSize == maxBatchSize) { +batchIndex++; +log.debug("Executing query {}; fieldIndexes: {}; batch index: {}; batch size: {}", new Object[]{sqlHolder.getSql(), sqlHolder.getFieldIndexes(), batchIndex, currentBatchSize}); +ps.executeBatch(); +currentBatchSize = 0; +} } } -log.debug("Executing query {}", new Object[]{sqlHolder}); -ps.executeBatch(); +if (currentBatchSize > 0) { +batchIndex++; +log.debug("Executing query {}; fieldIndexes: {}; batch index: {}; batch size: {}", new Object[]{sqlHolder.getSql(), sqlHolder.getFieldIndexes(), batchIndex, currentBatchSize}); +