[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2023-12-19 Thread Matt Burgess (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798751#comment-17798751
 ] 

Matt Burgess commented on NIFI-5642:


The "maxRowsPerFlowFile" variable is changed to match the rows available 
without fetching (default is 5000), so only 5000 rows get put into the FlowFile 
even if the Max Rows Per Flow File property is set to zero. I have a test table 
with 20k rows, if I run QueryCassandra once I get 4 output FlowFiles when I 
should only get one. There's a logic error or two in there that need to be 
fixed, but since it has been released as of 1.22.0 and 2.0.0-M1, I will open a 
new Jira to fix them.

> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Assignee: Levi Lentz
>Priority: Major
> Fix For: 2.0.0-M1, 1.22.0
>
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2023-12-13 Thread Jira


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796476#comment-17796476
 ] 

André Gomes Lamas Otero commented on NIFI-5642:
---

[~mattyb149] check the logic at AbstractCassandraProcessor. It is precisely 
what you are describing. 
The code updates from levilentz ([https://github.com/apache/nifi/pull/6848)] 
were made on top of the old 2018 history, thus the misleading commit messages 
from the old history.

> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Assignee: Levi Lentz
>Priority: Major
> Fix For: 2.0.0-M1, 1.22.0
>
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2023-12-13 Thread Matt Burgess (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796449#comment-17796449
 ] 

Matt Burgess commented on NIFI-5642:


That's not what Fetch Size is for, it's the number of rows the client asks for 
at a time. The Max Rows Per Flow File is what should be used to send FlowFiles 
downstream when they're "full". Any commits should be reverted as this will 
change the behavior other users expect.

> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Assignee: Levi Lentz
>Priority: Major
> Fix For: 2.0.0-M1, 1.22.0
>
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2023-04-05 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709144#comment-17709144
 ] 

ASF subversion and git services commented on NIFI-5642:
---

Commit 4f22f0985a58f6e8a36edea9f6c907f758945357 in nifi's branch 
refs/heads/support/nifi-1.x from aglotero
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=4f22f0985a ]

NIFI-5642: QueryCassandra processor : output FlowFiles as soon fetch_size is 
reached
NIFI-5642: QueryCassandra processor : output FlowFiles as soon fetch_size is 
reached
Fixed checkstyle error
Delete build.sh
Delete local build file
NIFI-5642 : letting fetch_size to control the Cassandra data flow creating a 
new MAX_ROWS_PER_FLOW_FILE parameter
Fixed checkstyle error: no more import java.util.*
Fixed missing imports
NIFI-5642: added REL_ORIGINAL relationship in order to allow incremental commit
Addressing comments from code review
Adjustments on timestamp datatype formatting
Created the OUTPUT_BATCH_SIZE property
Code review adjustments
NIFI-5642: update after rebase
NIFI-5642: addressing PR comments
NIFI-5642: adding in integration test, fixing race condition
NIFI-5642: remove log4j2

This closes #6848

Signed-off-by: Mike Thomsen 


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Assignee: Levi Lentz
>Priority: Major
> Fix For: 1.latest, 2.latest
>
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2023-04-05 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709090#comment-17709090
 ] 

ASF subversion and git services commented on NIFI-5642:
---

Commit d80a19e2308ce630b079b440c7f8452363ee9939 in nifi's branch 
refs/heads/main from aglotero
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=d80a19e230 ]

NIFI-5642: QueryCassandra processor : output FlowFiles as soon fetch_size is 
reached
NIFI-5642: QueryCassandra processor : output FlowFiles as soon fetch_size is 
reached
Fixed checkstyle error
Delete build.sh
Delete local build file
NIFI-5642 : letting fetch_size to control the Cassandra data flow creating a 
new MAX_ROWS_PER_FLOW_FILE parameter
Fixed checkstyle error: no more import java.util.*
Fixed missing imports
NIFI-5642: added REL_ORIGINAL relationship in order to allow incremental commit
Addressing comments from code review
Adjustments on timestamp datatype formatting
Created the OUTPUT_BATCH_SIZE property
Code review adjustments
NIFI-5642: update after rebase
NIFI-5642: addressing PR comments
NIFI-5642: adding in integration test, fixing race condition
NIFI-5642: remove log4j2

This closes #6848

Signed-off-by: Mike Thomsen 


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Assignee: Levi Lentz
>Priority: Major
> Fix For: 1.latest, 2.latest
>
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-11-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16705170#comment-16705170
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user aglotero commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r237965549
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = 

[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702071#comment-16702071
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r237138928
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 ---
@@ -132,17 +134,25 @@
 .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
 .build();
 
+// Relationships
 public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
 .name("success")
 .description("A FlowFile is transferred to this relationship 
if the operation completed successfully.")
 .build();
+
+static final Relationship REL_ORIGINAL = new Relationship.Builder()
+.name("original")
+.description("All input FlowFiles that are part of a 
successful query execution go here.")
--- End diff --

Maybe `CQL operation` here instead of `query execution`, see comments below


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702074#comment-16702074
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r237152109
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -269,11 +313,17 @@ public void process(final OutputStream out) throws 
IOException {
 // cap the error limit at 10, format the messages, and don't 
include the stack trace (it is displayed by the
 // logger message above).
 getLogger().error(nhae.getCustomMessage(10, true, false));
+if (fileToProcess == null) {
+fileToProcess = session.create();
+}
 fileToProcess = session.penalize(fileToProcess);
 session.transfer(fileToProcess, REL_RETRY);
-
 } catch (final QueryExecutionException qee) {
+//session.rollback();
--- End diff --

There are a few spots of commented out code, please remove if not 
necessary, thanks!


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702073#comment-16702073
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r237147586
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = 

[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702075#comment-16702075
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r237141917
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = 

[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702076#comment-16702076
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r237138338
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 ---
@@ -132,17 +134,25 @@
 .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
 .build();
 
+// Relationships
 public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
 .name("success")
 .description("A FlowFile is transferred to this relationship 
if the operation completed successfully.")
 .build();
+
+static final Relationship REL_ORIGINAL = new Relationship.Builder()
+.name("original")
+.description("All input FlowFiles that are part of a 
successful query execution go here.")
+.build();
+
 public static final Relationship REL_FAILURE = new 
Relationship.Builder()
 .name("failure")
-.description("A FlowFile is transferred to this relationship 
if the operation failed.")
+.description("CQL query execution failed.")
--- End diff --

I'd prefer `CQL operation` rather than `query`, as PutCassandra is more of 
a statement than a query, and this base processor could be used to do other 
things. Technically a processor wouldn't have to use CQL although I'm not sure 
of other ways to interact with Cassandra, and I think the user would be 
comfortable with the term CQL to mean Cassandra, even if it didn't use CQL.


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702072#comment-16702072
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r237138678
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 ---
@@ -132,17 +134,25 @@
 .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
 .build();
 
+// Relationships
 public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
 .name("success")
 .description("A FlowFile is transferred to this relationship 
if the operation completed successfully.")
 .build();
+
+static final Relationship REL_ORIGINAL = new Relationship.Builder()
+.name("original")
+.description("All input FlowFiles that are part of a 
successful query execution go here.")
+.build();
+
 public static final Relationship REL_FAILURE = new 
Relationship.Builder()
 .name("failure")
-.description("A FlowFile is transferred to this relationship 
if the operation failed.")
+.description("CQL query execution failed.")
 .build();
+
 public static final Relationship REL_RETRY = new 
Relationship.Builder().name("retry")
-.description("A FlowFile is transferred to this relationship 
if the operation cannot be completed but attempting "
-+ "it again may succeed.")
+.description("A FlowFile is transferred to this relationship 
if the query cannot be completed but attempting "
--- End diff --

I tend to like `operation` here (see above), and I like that you replaced 
`it` with `operation` for clarity.


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16681894#comment-16681894
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user aglotero commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r232374856
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -192,15 +205,17 @@ public void onScheduled(final ProcessContext context) 
{
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
 FlowFile fileToProcess = null;
+FlowFile inputFlowFile = null;
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+session.remove(inputFlowFile);
--- End diff --

I've created a "original" relationship to maintain the logic from other 
processors.


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16681897#comment-16681897
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user aglotero commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r232375017
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = 

[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-11-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671879#comment-16671879
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user aglotero commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r230119046
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = 

[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665154#comment-16665154
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user aglotero commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228522688
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = 

[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665156#comment-16665156
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user aglotero commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228522827
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -400,77 +478,87 @@ public static long convertToJsonStream(final 
ResultSet rs, final OutputStream ou
 outStream.write("{\"results\":[".getBytes(charset));
 final ColumnDefinitions columnDefinitions = 
rs.getColumnDefinitions();
 long nrOfRows = 0;
+long rowsAvailableWithoutFetching = 
rs.getAvailableWithoutFetching();
+
 if (columnDefinitions != null) {
-do {
-
-// Grab the ones we have
-int rowsAvailableWithoutFetching = 
rs.getAvailableWithoutFetching();
-if (rowsAvailableWithoutFetching == 0) {
-// Get more
-if (timeout <= 0 || timeUnit == null) {
-rs.fetchMoreResults().get();
-} else {
-rs.fetchMoreResults().get(timeout, timeUnit);
-}
+
+// Grab the ones we have
+if (rowsAvailableWithoutFetching == 0) {
+// Get more
+if (timeout <= 0 || timeUnit == null) {
+rs.fetchMoreResults().get();
+} else {
+rs.fetchMoreResults().get(timeout, timeUnit);
 }
+rowsAvailableWithoutFetching = 
rs.getAvailableWithoutFetching();
+}
 
-for (Row row : rs) {
-if (nrOfRows != 0) {
+if(maxRowsPerFlowFile == 0){
+maxRowsPerFlowFile = rowsAvailableWithoutFetching;
+}
+Row row;
+while(nrOfRows < maxRowsPerFlowFile){
+try {
+row = rs.iterator().next();
+}catch (NoSuchElementException nsee){
+//nrOfRows -= 1;
--- End diff --

True!


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664441#comment-16664441
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user aglotero commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228371811
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = 

[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664273#comment-16664273
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228325897
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -33,7 +32,7 @@
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
-import org.apache.commons.text.StringEscapeUtils;
+import org.apache.commons.lang3.StringEscapeUtils;
--- End diff --

I think we're on the verge of using Apache Commons Text instead of Commons 
Lang 3, maybe consider keeping this the way it is?


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664272#comment-16664272
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228325643
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 ---
@@ -132,17 +132,25 @@
 .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
 .build();
 
+// Relationships
 public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
 .name("success")
-.description("A FlowFile is transferred to this relationship 
if the operation completed successfully.")
+.description("Successfully created FlowFile from CQL query 
result set.")
--- End diff --

This relationship is reused by PutCassandraQL as well, where there is no 
result set or query per se (it's a statement). That's why the doc is so 
generic. If you'd like to have the doc be more specific, you can create a 
REL_SUCCESS relationship in QueryCassandra using `new 
Relationship.Builder().from(AbstractCassandraProcessor.REL_SUCCESS).description("Your
 description override").build()`


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664275#comment-16664275
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228328178
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = 

[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664274#comment-16664274
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228327507
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = 

[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664276#comment-16664276
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228331847
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = 

[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664277#comment-16664277
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228332165
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -400,77 +478,87 @@ public static long convertToJsonStream(final 
ResultSet rs, final OutputStream ou
 outStream.write("{\"results\":[".getBytes(charset));
 final ColumnDefinitions columnDefinitions = 
rs.getColumnDefinitions();
 long nrOfRows = 0;
+long rowsAvailableWithoutFetching = 
rs.getAvailableWithoutFetching();
+
 if (columnDefinitions != null) {
-do {
-
-// Grab the ones we have
-int rowsAvailableWithoutFetching = 
rs.getAvailableWithoutFetching();
-if (rowsAvailableWithoutFetching == 0) {
-// Get more
-if (timeout <= 0 || timeUnit == null) {
-rs.fetchMoreResults().get();
-} else {
-rs.fetchMoreResults().get(timeout, timeUnit);
-}
+
+// Grab the ones we have
+if (rowsAvailableWithoutFetching == 0) {
+// Get more
+if (timeout <= 0 || timeUnit == null) {
+rs.fetchMoreResults().get();
+} else {
+rs.fetchMoreResults().get(timeout, timeUnit);
 }
+rowsAvailableWithoutFetching = 
rs.getAvailableWithoutFetching();
+}
 
-for (Row row : rs) {
-if (nrOfRows != 0) {
+if(maxRowsPerFlowFile == 0){
+maxRowsPerFlowFile = rowsAvailableWithoutFetching;
+}
+Row row;
+while(nrOfRows < maxRowsPerFlowFile){
+try {
+row = rs.iterator().next();
+}catch (NoSuchElementException nsee){
+//nrOfRows -= 1;
--- End diff --

This is commented out here but active in the Avro version above, I assume 
they need to be the same?


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-18 Thread JIRA


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655839#comment-16655839
 ] 

André Gomes Lamas Otero commented on NIFI-5642:
---

Hi Folks!
Is anyone available to review my PR ?
Thanks!

> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646709#comment-16646709
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user aglotero commented on the issue:

https://github.com/apache/nifi/pull/3051
  
Hi @mattyb149!
This last commit contains all changes from the last commits, addressing 
your comments.
Regards,


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16643531#comment-16643531
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user aglotero commented on the issue:

https://github.com/apache/nifi/pull/3051
  
Thanks for the comments @mattyb149!
Your understanding is correct regarding the intent of this change, I'll 
take a look on QueryDatabaseTable to understand how to add an "Output Batch 
Size" parameter on QueryCassandra processor.
Regards,
André


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16643447#comment-16643447
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r223701761
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -192,15 +205,17 @@ public void onScheduled(final ProcessContext context) 
{
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
 FlowFile fileToProcess = null;
+FlowFile inputFlowFile = null;
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+session.remove(inputFlowFile);
--- End diff --

I don't follow the logic here. If there is a flow file in the incoming 
connection,  this appears to remove it from the session, and fileToProcess will 
always be null, which means we couldn't use flow file attributes to evaluate 
properties such as CQL Query, Query Timeout, Charset, etc. Another effect is 
that provenance/lineage will not be preserved for incoming files, as the 
incoming file will be removed, and any flow files generated by this processor 
will appear to have been created here, so you can't track that a flow file "A" 
came in and, as a result, generated flow files X,Y,Z.


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16643446#comment-16643446
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r223700156
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -132,6 +131,20 @@
 
 private final static List propertyDescriptors;
 
+// Relationships
+public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
--- End diff --

These relationships are available (albeit with more generic descriptions) 
in the AbstractCassandraProcessor class. If we need more specific documentation 
then we should remove the common relationships and provide specific 
documentation/relationships in all Cassandra processors.


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642517#comment-16642517
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

GitHub user aglotero opened a pull request:

https://github.com/apache/nifi/pull/3051

NIFI-5642: QueryCassandra processor : output FlowFiles as soon fetch_size 
is reached

…size is reached

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [X] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [X] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [X] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [X] Is your initial contribution a single, squashed commit?

### For code changes:
- [X] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [X] Have you written or updated unit tests to verify your changes?
- [X] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [X] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [X] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [X] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aglotero/nifi NIFI-5642

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/3051.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3051


commit 584251d8af5069c5150d6cbc9ba3eac9b6f0
Author: aglotero 
Date:   2018-10-08T21:20:26Z

NIFI-5642: QueryCassandra processor : output FlowFiles as soon fetch_size 
is reached




> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642477#comment-16642477
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

Github user aglotero closed the pull request at:

https://github.com/apache/nifi/pull/3050


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -
>
> Key: NIFI-5642
> URL: https://issues.apache.org/jira/browse/NIFI-5642
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.7.1
>Reporter: André Gomes Lamas Otero
>Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642468#comment-16642468
 ] 

ASF GitHub Bot commented on NIFI-5642:
--

GitHub user aglotero opened a pull request:

https://github.com/apache/nifi/pull/3050

NIFI-5642 : QueryCassandra processor : output FlowFiles as soon fetch_size 
is reached

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [X] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [X] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [X] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [X] Is your initial contribution a single, squashed commit?

### For code changes:
- [X] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [X] Have you written or updated unit tests to verify your changes?
- [X] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aglotero/nifi NIFI-5642

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/3050.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3050


commit 76b82e23a16a3ca0b2556d5d3f54140f446c0d9d
Author: Andy LoPresto 
Date:   2018-07-07T05:07:46Z

NIFI-5370 removed custom hostname verifier implementation from 
OkHttpReplicationClient (default handles wildcard certs).
This closes #2869.

Signed-off-by: Mark Payne 

commit 333146b3fecef50cc16751cdf2fe7bcddcc7e03d
Author: Mark Bean 
Date:   2018-07-05T19:35:15Z

NIFI-5368 controller services validated prior to enabling; referenced 
controller services must be enabled for referencing component to be valid (mock 
framework)

This closes #2873.

Signed-off-by: Mark Payne 

commit 54bb511e579a00535c976159fa2903385af74f77
Author: Mark Payne 
Date:   2018-07-09T15:50:06Z

NIFI-5377: Addressed issue of infinite recursion when enabling/disabling 
controller services if there is a recursive loop (i.e., Service A references 
Service B references Service A). This closes #2847

Signed-off-by: Matt Gilman 

commit 260bc29e1014a2fd21c3775e27f434756e13e0ee
Author: Bryan Bende 
Date:   2018-06-25T14:36:55Z

NIFI-5316 Fixed array handling for Avro that comes from Parquet's Avro 
reader

Signed-off-by: zenfenan 

commit e09ab9b69a3bd21c09118dd2a2c9b8bd4e091bae
Author: Mark Payne 
Date:   2018-06-29T13:25:57Z

NIFI-5361: When submitting many processors to start, calculate the 'timeout 
timestamp' immediately before calling @OnScheduled method, after the task has 
been scheduled to run, instead of before the task has a chance to run.

This closes #2831

commit cfdb0de8f83c3e7d732cac6a5d18b2f1631d20e5
Author: Mark Payne 
Date:   2018-07-02T14:00:38Z

NIFI-5362: When a processor is terminated and has no more active threads, 
ensure that we set this.hasActiveThreads = false

Signed-off-by: Pierre Villard 

This closes #2832.

commit 7b28b914cd6ca4c5f0d566a8726ca02ebd38f3c2
Author: Peter Toth 
Date:   2018-06-07T10:13:21Z

NIFI-5278: fixes JSON escaping of code

Change-Id: I2cb0e6c658d4a0f2aad9c4aab9201a3334ee54df

NIFI-5278: adds Apache Commons Text to NOTICE

Change-Id: I8185239b0a888c16159b18f13d6682ba350cc766

NIFI-5278: adds tests

Change-Id: I9286ac71bc7399e5bdc1e6602609b5e8829db27e

NIFI-5278: fixes review findings

Change-Id: I292c93dae877cf1cd146f3897b7e132b6afac801
Signed-off-by: Matthew Burgess