nifi git commit: NIFI-5823: Fixes typo in min idle connections property name

2018-11-15 Thread ijokarumawak
Repository: nifi
Updated Branches:
  refs/heads/master 0207d0813 -> 102a5288e


NIFI-5823: Fixes typo in min idle connections property name

So many people missed it :-(

This closes #3172.

Signed-off-by: Koji Kawamura 


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/102a5288
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/102a5288
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/102a5288

Branch: refs/heads/master
Commit: 102a5288efb2a22cd54815dd7331dfc5826aee91
Parents: 0207d08
Author: Colin Dean 
Authored: Thu Nov 15 13:46:41 2018 -0500
Committer: Koji Kawamura 
Committed: Fri Nov 16 11:16:55 2018 +0900

--
 .../src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/102a5288/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
index 5228496..d6f83f2 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
@@ -194,7 +194,7 @@ public class DBCPConnectionPool extends 
AbstractControllerService implements DBC
 
 public static final PropertyDescriptor MIN_IDLE = new 
PropertyDescriptor.Builder()
 .displayName("Minimum Idle Connections")
-.name("dbcp-mim-idle-conns")
+.name("dbcp-min-idle-conns")
 .description("The minimum number of connections that can remain 
idle in the pool, without extra ones being " +
 "created, or zero to create none.")
 .defaultValue(DEFAULT_MIN_IDLE)
@@ -438,4 +438,4 @@ public class DBCPConnectionPool extends 
AbstractControllerService implements DBC
 BasicDataSource getDataSource() {
 return dataSource;
 }
-}
\ No newline at end of file
+}



nifi-minifi-cpp git commit: MINIFICPP-677: Change behavior of async callback

2018-11-15 Thread aldrin
Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 1257e5291 -> d7885d6bd


MINIFICPP-677: Change behavior of async callback

This closes #441.

Signed-off-by: Aldrin Piri 


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/d7885d6b
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/d7885d6b
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/d7885d6b

Branch: refs/heads/master
Commit: d7885d6bd8dea00d584fe9fcbc3eb21a12d43205
Parents: 1257e52
Author: Marc Parisi 
Authored: Thu Nov 15 15:30:46 2018 -0500
Committer: Aldrin Piri 
Committed: Thu Nov 15 20:00:30 2018 -0500

--
 extensions/http-curl/client/HTTPClient.cpp  |  4 ++
 extensions/http-curl/client/HTTPStream.cpp  | 15 +++---
 extensions/http-curl/client/HTTPStream.h|  3 ++
 .../http-curl/tests/HTTPIntegrationBase.h   | 15 --
 .../http-curl/tests/HTTPSiteToSiteTests.cpp |  2 +-
 libminifi/include/utils/ByteArrayCallback.h |  2 +-
 libminifi/test/integration/IntegrationBase.h| 52 +---
 7 files changed, 50 insertions(+), 43 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/client/HTTPClient.cpp
--
diff --git a/extensions/http-curl/client/HTTPClient.cpp 
b/extensions/http-curl/client/HTTPClient.cpp
index 7940c6e..d607664 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -99,6 +99,10 @@ HTTPClient::~HTTPClient() {
 curl_easy_cleanup(http_session_);
 http_session_ = nullptr;
   }
+  // forceClose ended up not being the issue in MINIFICPP-667, but leaving here
+  // out of good hygiene.
+  forceClose();
+  read_callback_.close();
   logger_->log_trace("Closing HTTPClient for %s", url_);
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/client/HTTPStream.cpp
--
diff --git a/extensions/http-curl/client/HTTPStream.cpp 
b/extensions/http-curl/client/HTTPStream.cpp
index 608870b..8735b61 100644
--- a/extensions/http-curl/client/HTTPStream.cpp
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -37,9 +37,9 @@ HttpStream::HttpStream(std::shared_ptr 
client)
   written(0),
   // given the nature of the stream we don't want to slow libCURL, we will 
produce
   // a warning instead allowing us to adjust it server side or through the 
local configuration.
-  http_read_callback_(66560,true),
+  http_read_callback_(66560, true),
   started_(false),
-  logger_(logging::LoggerFactory::getLogger()){
+  logger_(logging::LoggerFactory::getLogger()) {
   // submit early on
 }
 
@@ -54,7 +54,7 @@ void HttpStream::seek(uint64_t offset) {
 }
 
 int HttpStream::writeData(std::vector , int buflen) {
-  if ((int)buf.capacity() < buflen) {
+  if ((int) buf.capacity() < buflen) {
 return -1;
   }
   return writeData(reinterpret_cast([0]), buflen);
@@ -70,11 +70,11 @@ int HttpStream::writeData(uint8_t *value, int size) {
 callback_.ptr = _callback_;
 callback_.pos = 0;
 http_client_->setUploadCallback(_);
-http_client_future_ = std::async(submit_client, http_client_);
+http_client_future_ = std::async(std::launch::async, submit_client, 
http_client_);
 started_ = true;
   }
 }
-http_callback_.process(value,size);
+http_callback_.process(value, size);
 return size;
   } else {
 return -1;
@@ -90,7 +90,7 @@ inline std::vector HttpStream::readBuffer(const T& 
t) {
 }
 
 int HttpStream::readData(std::vector , int buflen) {
-  if ((int)buf.capacity() < buflen) {
+  if ((int) buf.capacity() < buflen) {
 buf.resize(buflen);
   }
   int ret = readData(reinterpret_cast([0]), buflen);
@@ -109,11 +109,10 @@ int HttpStream::readData(uint8_t *buf, int buflen) {
 read_callback_.ptr = _read_callback_;
 read_callback_.pos = 0;
 http_client_->setReadCallback(_callback_);
-http_client_future_ = std::async(submit_read_client, http_client_, 
_read_callback_);
+http_client_future_ = std::async(std::launch::async, 
submit_read_client, http_client_, _read_callback_);
 started_ = true;
   }
 }
-
 return http_read_callback_.readFully((char*) buf, buflen);
 
   } else {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d7885d6b/extensions/http-curl/client/HTTPStream.h
--
diff --git a/extensions/http-curl/client/HTTPStream.h 
b/extensions/http-curl/client/HTTPStream.h
index d3e5bca..3829e94 100644
--- a/extensions/http-curl/client/HTTPStream.h
+++ 

nifi git commit: NIFI-5604: Added property to allow empty FlowFile when no SQL generated by GenerateTableFetch

2018-11-15 Thread pwicks
Repository: nifi
Updated Branches:
  refs/heads/master 75906226a -> 0207d0813


NIFI-5604: Added property to allow empty FlowFile when no SQL generated by 
GenerateTableFetch

co-authored by: Peter Wicks 
Signed-off-by: Peter Wicks 

This closes #3075.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0207d081
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0207d081
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0207d081

Branch: refs/heads/master
Commit: 0207d0813ef164ee7227ded61fb10960a4842e2d
Parents: 7590622
Author: Matthew Burgess 
Authored: Thu Nov 1 22:09:13 2018 -0400
Committer: Peter Wicks 
Committed: Thu Nov 15 13:37:43 2018 -0700

--
 .../processors/standard/GenerateTableFetch.java | 104 +--
 .../standard/TestGenerateTableFetch.java|  47 -
 2 files changed, 118 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/0207d081/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
index a547393..b4ba9fe 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
@@ -142,6 +142,18 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 .build();
 
+static final PropertyDescriptor OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS = 
new PropertyDescriptor.Builder()
+.name("gen-table-output-flowfile-on-zero-results")
+.displayName("Output Empty FlowFile on Zero Results")
+.description("Depending on the specified properties, an execution 
of this processor may not result in any SQL statements generated. When this 
property "
++ "is true, an empty flow file will be generated (having 
the parent of the incoming flow file if present) and transferred to the 
'success' relationship. "
++ "When this property is false, no output flow files will 
be generated.")
+.required(true)
+.allowableValues("true", "false")
+.defaultValue("false")
+.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+.build();
+
 public static final Relationship REL_FAILURE = new Relationship.Builder()
 .name("failure")
 .description("This relationship is only used when SQL query 
execution (using an incoming FlowFile) failed. The incoming FlowFile will be 
penalized and routed to this relationship. "
@@ -164,6 +176,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
 pds.add(PARTITION_SIZE);
 pds.add(COLUMN_FOR_VALUE_PARTITIONING);
 pds.add(WHERE_CLAUSE);
+pds.add(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS);
 propDescriptors = Collections.unmodifiableList(pds);
 }
 
@@ -247,6 +260,7 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
 final String columnForPartitioning = 
context.getProperty(COLUMN_FOR_VALUE_PARTITIONING).evaluateAttributeExpressions(fileToProcess).getValue();
 final boolean useColumnValsForPaging = 
!StringUtils.isEmpty(columnForPartitioning);
 final String customWhereClause = 
context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
+final boolean outputEmptyFlowFileOnZeroResults = 
context.getProperty(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS).asBoolean();
 
 final StateManager stateManager = context.getStateManager();
 final StateMap stateMap;
@@ -435,49 +449,75 @@ public class GenerateTableFetch extends 
AbstractDatabaseFetchProcessor {
 }
 
 // Generate SQL statements to read "pages" of data
-Long limit = partitionSize == 0 ? null : (long) partitionSize;
 final String fragmentIdentifier = UUID.randomUUID().toString();
-for (long i = 0; i < numberOfFetches; i++) {
-// Add a right bounding for the partitioning column if 
necessary (only on last partition, meaning we don't need the limit)
-if ((i == numberOfFetches - 1) && 

nifi git commit: NIFI-5780 Add pre and post statements to ExecuteSQL and ExecuteSQLRecord

2018-11-15 Thread pwicks
Repository: nifi
Updated Branches:
  refs/heads/master be0949570 -> 75906226a


NIFI-5780 Add pre and post statements to ExecuteSQL and ExecuteSQLRecord

Signed-off-by: Peter Wicks 

This closes #3156.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/75906226
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/75906226
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/75906226

Branch: refs/heads/master
Commit: 75906226a6f265acc7d87414ff477a5b0646b6d8
Parents: be09495
Author: yjhyjhyjh0 
Authored: Thu Nov 8 00:25:50 2018 +0800
Committer: Peter Wicks 
Committed: Thu Nov 15 13:18:31 2018 -0700

--
 .../nifi/processors/hive/SelectHiveQL.java  |   4 +-
 .../processors/standard/AbstractExecuteSQL.java |  79 +-
 .../nifi/processors/standard/ExecuteSQL.java|   2 +
 .../processors/standard/ExecuteSQLRecord.java   |   2 +
 .../processors/standard/TestExecuteSQL.java | 146 +++
 .../standard/TestExecuteSQLRecord.java  | 132 +
 6 files changed, 362 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
--
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
index 5342c09..3b8576b 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
@@ -109,7 +109,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
 public static final PropertyDescriptor HIVEQL_PRE_QUERY = new 
PropertyDescriptor.Builder()
 .name("hive-pre-query")
 .displayName("HiveQL Pre-Query")
-.description("HiveQL pre-query to execute. Semicolon-delimited 
list of queries. "
+.description("A semicolon-delimited list of queries executed 
before the main SQL query is executed. "
 + "Example: 'set tez.queue.name=queue1; set 
hive.exec.orc.split.strategy=ETL; set 
hive.exec.reducers.bytes.per.reducer=1073741824'. "
 + "Note, the results/outputs of these queries will be 
suppressed if successfully executed.")
 .required(false)
@@ -129,7 +129,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
 public static final PropertyDescriptor HIVEQL_POST_QUERY = new 
PropertyDescriptor.Builder()
 .name("hive-post-query")
 .displayName("HiveQL Post-Query")
-.description("HiveQL post-query to execute. Semicolon-delimited 
list of queries. "
+.description("A semicolon-delimited list of queries executed after 
the main SQL query is executed. "
 + "Note, the results/outputs of these queries will be 
suppressed if successfully executed.")
 .required(false)
 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)

http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index bf46549..d1fabef 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.dbcp.DBCPService;
@@ -44,6 +45,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -82,6 +84,17 @@ public abstract class AbstractExecuteSQL extends 

nifi git commit: NIFI-5824: Added unit test to FlowController to ensure that the ProcessScheduler that it creates is properly initialized. Also updated the properties file used by TestFlowController t

2018-11-15 Thread bbende
Repository: nifi
Updated Branches:
  refs/heads/master 76b0065a6 -> be0949570


NIFI-5824: Added unit test to FlowController to ensure that the 
ProcessScheduler that it creates is properly initialized. Also updated the 
properties file used by TestFlowController to use a VolatileContentRepository 
instead of FileSystemRepository, and fixed EventDrivenWorkerQueue to return if 
calls to poll() are interrupted (via Thread.interrupt) - making these minor 
fixes resulted in the unit test TestFlowController running in 2 seconds instead 
of 30+ seconds on my machine

This closes #3173.

Signed-off-by: Bryan Bende 


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/be094957
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/be094957
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/be094957

Branch: refs/heads/master
Commit: be0949570a66f672e128ac97c936df546c7d2521
Parents: 76b0065
Author: Mark Payne 
Authored: Thu Nov 15 14:26:36 2018 -0500
Committer: Bryan Bende 
Committed: Thu Nov 15 15:01:56 2018 -0500

--
 .../nifi/controller/EventDrivenWorkerQueue.java | 12 ++-
 .../scheduling/StandardProcessScheduler.java|  9 +++--
 .../nifi/controller/TestFlowController.java | 21 
 .../flowcontrollertest.nifi.properties  |  3 +++
 4 files changed, 38 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/be094957/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
--
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
index f36a459..25e8a86 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
@@ -16,6 +16,11 @@
  */
 package org.apache.nifi.controller;
 
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.Connectables;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -25,11 +30,6 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.nifi.connectable.Connectable;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.util.Connectables;
-
 public class EventDrivenWorkerQueue implements WorkerQueue {
 
 private final Object workMonitor = new Object();
@@ -69,6 +69,8 @@ public class EventDrivenWorkerQueue implements WorkerQueue {
 try {
 workMonitor.wait(timeLeft);
 } catch (final InterruptedException ignored) {
+Thread.currentThread().interrupt();
+return null;
 }
 } else {
 // Decrement the amount of work there is to do for this 
worker.

http://git-wip-us.apache.org/repos/asf/nifi/blob/be094957/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
--
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 2902f6a..2ff3307 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -35,6 +35,7 @@ import org.apache.nifi.controller.SchedulingAgentCallback;
 import org.apache.nifi.controller.StandardProcessorNode;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.service.ControllerServiceNode;
+import 

nifi git commit: NIFI-5822: Ensure that we don't call FlowController.getControllerServiceProvider() before the ControllerServiceProvider has been initialized

2018-11-15 Thread bbende
Repository: nifi
Updated Branches:
  refs/heads/master d319a3ef2 -> 76b0065a6


NIFI-5822: Ensure that we don't call 
FlowController.getControllerServiceProvider() before the 
ControllerServiceProvider has been initialized

This closes #3171.

Signed-off-by: Bryan Bende 


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/76b0065a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/76b0065a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/76b0065a

Branch: refs/heads/master
Commit: 76b0065a67c0e3270d44d85e1933c41bfaba8841
Parents: d319a3e
Author: Mark Payne 
Authored: Thu Nov 15 12:33:03 2018 -0500
Committer: Bryan Bende 
Committed: Thu Nov 15 13:25:33 2018 -0500

--
 .../nifi/controller/scheduling/StandardProcessScheduler.java  | 7 ++-
 1 file changed, 2 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/76b0065a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
--
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 6313097..2902f6a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -35,7 +35,6 @@ import org.apache.nifi.controller.SchedulingAgentCallback;
 import org.apache.nifi.controller.StandardProcessorNode;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.service.ControllerServiceNode;
-import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.logging.ComponentLog;
@@ -72,7 +71,6 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(StandardProcessScheduler.class);
 
-private final ControllerServiceProvider controllerServiceProvider;
 private final FlowController flowController;
 private final long administrativeYieldMillis;
 private final String administrativeYieldDuration;
@@ -92,7 +90,6 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 public StandardProcessScheduler(final FlowEngine 
componentLifecycleThreadPool, final FlowController flowController, final 
StringEncryptor encryptor,
 final StateManagerProvider stateManagerProvider, final NiFiProperties 
nifiProperties) {
 this.componentLifeCycleThreadPool = componentLifecycleThreadPool;
-this.controllerServiceProvider = 
flowController.getControllerServiceProvider();
 this.flowController = flowController;
 this.encryptor = encryptor;
 this.stateManagerProvider = stateManagerProvider;
@@ -296,7 +293,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 public synchronized CompletableFuture startProcessor(final 
ProcessorNode procNode, final boolean failIfStopping) {
 final LifecycleState lifecycleState = 
getLifecycleState(requireNonNull(procNode), true);
 
-final StandardProcessContext processContext = new 
StandardProcessContext(procNode, this.controllerServiceProvider,
+final StandardProcessContext processContext = new 
StandardProcessContext(procNode, flowController.getControllerServiceProvider(),
 this.encryptor, getStateManager(procNode.getIdentifier()), 
lifecycleState::isTerminated);
 
 final CompletableFuture future = new CompletableFuture<>();
@@ -336,7 +333,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 public synchronized CompletableFuture stopProcessor(final 
ProcessorNode procNode) {
 final LifecycleState lifecycleState = getLifecycleState(procNode, 
false);
 
-StandardProcessContext processContext = new 
StandardProcessContext(procNode, this.controllerServiceProvider,
+StandardProcessContext processContext = new 
StandardProcessContext(procNode, flowController.getControllerServiceProvider(),
 this.encryptor, getStateManager(procNode.getIdentifier()), 
lifecycleState::isTerminated);
 
 LOG.info("Stopping {}", procNode);



nifi git commit: NIFI-5788: Introduce batch size limit in PutDatabaseRecord processor

2018-11-15 Thread mattyb149
Repository: nifi
Updated Branches:
  refs/heads/master 13011ac6d -> d319a3ef2


NIFI-5788: Introduce batch size limit in PutDatabaseRecord processor

NIFI-5788: Introduce batch size limit in PutDatabaseRecord processor

Renamed 'batch size' to 'Maximum Batch Size'.
Changed default value of max_batch_size to zero (INFINITE)
Fixed parameter validation.
Added unit tests

Signed-off-by: Matthew Burgess 

This closes #3128


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d319a3ef
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d319a3ef
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d319a3ef

Branch: refs/heads/master
Commit: d319a3ef2f14317f29a1be5a189bc34f8b3fdbd6
Parents: 13011ac
Author: vadimar 
Authored: Mon Nov 5 13:15:12 2018 +0200
Committer: Matthew Burgess 
Committed: Thu Nov 15 10:31:34 2018 -0500

--
 .../processors/standard/PutDatabaseRecord.java  |  29 +-
 .../standard/TestPutDatabaseRecord.groovy   | 103 +++
 2 files changed, 130 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/d319a3ef/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 2f2d901..d79cf3c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -275,6 +275,17 @@ public class PutDatabaseRecord extends 
AbstractSessionFactoryProcessor {
 .required(true)
 .build();
 
+static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("put-db-record-max-batch-size")
+.displayName("Maximum Batch Size")
+.description("Specifies maximum batch size for INSERT and UPDATE 
statements. This parameter has no effect for other statements specified in 
'Statement Type'."
++ " Zero means the batch size is not limited.")
+.defaultValue("0")
+.required(false)
+.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.build();
+
 protected static List propDescriptors;
 
 private Cache schemaCache;
@@ -303,6 +314,7 @@ public class PutDatabaseRecord extends 
AbstractSessionFactoryProcessor {
 pds.add(QUERY_TIMEOUT);
 pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
 pds.add(TABLE_SCHEMA_CACHE_SIZE);
+pds.add(MAX_BATCH_SIZE);
 
 propDescriptors = Collections.unmodifiableList(pds);
 }
@@ -641,6 +653,10 @@ public class PutDatabaseRecord extends 
AbstractSessionFactoryProcessor {
 Record currentRecord;
 List fieldIndexes = sqlHolder.getFieldIndexes();
 
+final Integer maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
+int currentBatchSize = 0;
+int batchIndex = 0;
+
 while ((currentRecord = recordParser.nextRecord()) != null) {
 Object[] values = currentRecord.getValues();
 if (values != null) {
@@ -667,11 +683,20 @@ public class PutDatabaseRecord extends 
AbstractSessionFactoryProcessor {
 }
 }
 ps.addBatch();
+if (++currentBatchSize == maxBatchSize) {
+batchIndex++;
+log.debug("Executing query {}; fieldIndexes: {}; batch 
index: {}; batch size: {}", new Object[]{sqlHolder.getSql(), 
sqlHolder.getFieldIndexes(), batchIndex, currentBatchSize});
+ps.executeBatch();
+currentBatchSize = 0;
+}
 }
 }
 
-log.debug("Executing query {}", new Object[]{sqlHolder});
-ps.executeBatch();
+if (currentBatchSize > 0) {
+batchIndex++;
+log.debug("Executing query {}; fieldIndexes: {}; batch index: 
{}; batch size: {}", new Object[]{sqlHolder.getSql(), 
sqlHolder.getFieldIndexes(), batchIndex, currentBatchSize});
+