[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798751#comment-17798751 ] Matt Burgess commented on NIFI-5642: The "maxRowsPerFlowFile" variable is changed to match the rows available without fetching (default is 5000), so only 5000 rows get put into the FlowFile even if the Max Rows Per Flow File property is set to zero. I have a test table with 20k rows, if I run QueryCassandra once I get 4 output FlowFiles when I should only get one. There's a logic error or two in there that need to be fixed, but since it has been released as of 1.22.0 and 2.0.0-M1, I will open a new Jira to fix them. > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Assignee: Levi Lentz >Priority: Major > Fix For: 2.0.0-M1, 1.22.0 > > Time Spent: 4h 10m > Remaining Estimate: 0h > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796476#comment-17796476 ] André Gomes Lamas Otero commented on NIFI-5642: --- [~mattyb149] check the logic at AbstractCassandraProcessor. It is precisely what you are describing. The code updates from levilentz ([https://github.com/apache/nifi/pull/6848)] were made on top of the old 2018 history, thus the misleading commit messages from the old history. > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Assignee: Levi Lentz >Priority: Major > Fix For: 2.0.0-M1, 1.22.0 > > Time Spent: 4h 10m > Remaining Estimate: 0h > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796449#comment-17796449 ] Matt Burgess commented on NIFI-5642: That's not what Fetch Size is for, it's the number of rows the client asks for at a time. The Max Rows Per Flow File is what should be used to send FlowFiles downstream when they're "full". Any commits should be reverted as this will change the behavior other users expect. > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Assignee: Levi Lentz >Priority: Major > Fix For: 2.0.0-M1, 1.22.0 > > Time Spent: 4h 10m > Remaining Estimate: 0h > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709144#comment-17709144 ] ASF subversion and git services commented on NIFI-5642: --- Commit 4f22f0985a58f6e8a36edea9f6c907f758945357 in nifi's branch refs/heads/support/nifi-1.x from aglotero [ https://gitbox.apache.org/repos/asf?p=nifi.git;h=4f22f0985a ] NIFI-5642: QueryCassandra processor : output FlowFiles as soon fetch_size is reached NIFI-5642: QueryCassandra processor : output FlowFiles as soon fetch_size is reached Fixed checkstyle error Delete build.sh Delete local build file NIFI-5642 : letting fetch_size to control the Cassandra data flow creating a new MAX_ROWS_PER_FLOW_FILE parameter Fixed checkstyle error: no more import java.util.* Fixed missing imports NIFI-5642: added REL_ORIGINAL relationship in order to allow incremental commit Addressing comments from code review Adjustments on timestamp datatype formatting Created the OUTPUT_BATCH_SIZE property Code review adjustments NIFI-5642: update after rebase NIFI-5642: addressing PR comments NIFI-5642: adding in integration test, fixing race condition NIFI-5642: remove log4j2 This closes #6848 Signed-off-by: Mike Thomsen > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Assignee: Levi Lentz >Priority: Major > Fix For: 1.latest, 2.latest > > Time Spent: 4h 10m > Remaining Estimate: 0h > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709090#comment-17709090 ] ASF subversion and git services commented on NIFI-5642: --- Commit d80a19e2308ce630b079b440c7f8452363ee9939 in nifi's branch refs/heads/main from aglotero [ https://gitbox.apache.org/repos/asf?p=nifi.git;h=d80a19e230 ] NIFI-5642: QueryCassandra processor : output FlowFiles as soon fetch_size is reached NIFI-5642: QueryCassandra processor : output FlowFiles as soon fetch_size is reached Fixed checkstyle error Delete build.sh Delete local build file NIFI-5642 : letting fetch_size to control the Cassandra data flow creating a new MAX_ROWS_PER_FLOW_FILE parameter Fixed checkstyle error: no more import java.util.* Fixed missing imports NIFI-5642: added REL_ORIGINAL relationship in order to allow incremental commit Addressing comments from code review Adjustments on timestamp datatype formatting Created the OUTPUT_BATCH_SIZE property Code review adjustments NIFI-5642: update after rebase NIFI-5642: addressing PR comments NIFI-5642: adding in integration test, fixing race condition NIFI-5642: remove log4j2 This closes #6848 Signed-off-by: Mike Thomsen > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Assignee: Levi Lentz >Priority: Major > Fix For: 1.latest, 2.latest > > Time Spent: 4h > Remaining Estimate: 0h > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16705170#comment-16705170 ] ASF GitHub Bot commented on NIFI-5642: -- Github user aglotero commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237965549 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet =
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702071#comment-16702071 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237138928 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java --- @@ -132,17 +134,25 @@ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); +// Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile is transferred to this relationship if the operation completed successfully.") .build(); + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.name("original") +.description("All input FlowFiles that are part of a successful query execution go here.") --- End diff -- Maybe `CQL operation` here instead of `query execution`, see comments below > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702074#comment-16702074 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237152109 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -269,11 +313,17 @@ public void process(final OutputStream out) throws IOException { // cap the error limit at 10, format the messages, and don't include the stack trace (it is displayed by the // logger message above). getLogger().error(nhae.getCustomMessage(10, true, false)); +if (fileToProcess == null) { +fileToProcess = session.create(); +} fileToProcess = session.penalize(fileToProcess); session.transfer(fileToProcess, REL_RETRY); - } catch (final QueryExecutionException qee) { +//session.rollback(); --- End diff -- There are a few spots of commented out code, please remove if not necessary, thanks! > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702073#comment-16702073 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237147586 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet =
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702075#comment-16702075 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237141917 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet =
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702076#comment-16702076 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237138338 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java --- @@ -132,17 +134,25 @@ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); +// Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile is transferred to this relationship if the operation completed successfully.") .build(); + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.name("original") +.description("All input FlowFiles that are part of a successful query execution go here.") +.build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") -.description("A FlowFile is transferred to this relationship if the operation failed.") +.description("CQL query execution failed.") --- End diff -- I'd prefer `CQL operation` rather than `query`, as PutCassandra is more of a statement than a query, and this base processor could be used to do other things. Technically a processor wouldn't have to use CQL although I'm not sure of other ways to interact with Cassandra, and I think the user would be comfortable with the term CQL to mean Cassandra, even if it didn't use CQL. > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702072#comment-16702072 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237138678 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java --- @@ -132,17 +134,25 @@ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); +// Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile is transferred to this relationship if the operation completed successfully.") .build(); + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.name("original") +.description("All input FlowFiles that are part of a successful query execution go here.") +.build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") -.description("A FlowFile is transferred to this relationship if the operation failed.") +.description("CQL query execution failed.") .build(); + public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") -.description("A FlowFile is transferred to this relationship if the operation cannot be completed but attempting " -+ "it again may succeed.") +.description("A FlowFile is transferred to this relationship if the query cannot be completed but attempting " --- End diff -- I tend to like `operation` here (see above), and I like that you replaced `it` with `operation` for clarity. > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16681894#comment-16681894 ] ASF GitHub Bot commented on NIFI-5642: -- Github user aglotero commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r232374856 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -192,15 +205,17 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile fileToProcess = null; +FlowFile inputFlowFile = null; if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } +session.remove(inputFlowFile); --- End diff -- I've created a "original" relationship to maintain the logic from other processors. > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16681897#comment-16681897 ] ASF GitHub Bot commented on NIFI-5642: -- Github user aglotero commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r232375017 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet =
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671879#comment-16671879 ] ASF GitHub Bot commented on NIFI-5642: -- Github user aglotero commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r230119046 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet =
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665154#comment-16665154 ] ASF GitHub Bot commented on NIFI-5642: -- Github user aglotero commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228522688 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet =
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665156#comment-16665156 ] ASF GitHub Bot commented on NIFI-5642: -- Github user aglotero commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228522827 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -400,77 +478,87 @@ public static long convertToJsonStream(final ResultSet rs, final OutputStream ou outStream.write("{\"results\":[".getBytes(charset)); final ColumnDefinitions columnDefinitions = rs.getColumnDefinitions(); long nrOfRows = 0; +long rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); + if (columnDefinitions != null) { -do { - -// Grab the ones we have -int rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); -if (rowsAvailableWithoutFetching == 0) { -// Get more -if (timeout <= 0 || timeUnit == null) { -rs.fetchMoreResults().get(); -} else { -rs.fetchMoreResults().get(timeout, timeUnit); -} + +// Grab the ones we have +if (rowsAvailableWithoutFetching == 0) { +// Get more +if (timeout <= 0 || timeUnit == null) { +rs.fetchMoreResults().get(); +} else { +rs.fetchMoreResults().get(timeout, timeUnit); } +rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); +} -for (Row row : rs) { -if (nrOfRows != 0) { +if(maxRowsPerFlowFile == 0){ +maxRowsPerFlowFile = rowsAvailableWithoutFetching; +} +Row row; +while(nrOfRows < maxRowsPerFlowFile){ +try { +row = rs.iterator().next(); +}catch (NoSuchElementException nsee){ +//nrOfRows -= 1; --- End diff -- True! > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664441#comment-16664441 ] ASF GitHub Bot commented on NIFI-5642: -- Github user aglotero commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228371811 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet =
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664273#comment-16664273 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228325897 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -33,7 +32,7 @@ import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; -import org.apache.commons.text.StringEscapeUtils; +import org.apache.commons.lang3.StringEscapeUtils; --- End diff -- I think we're on the verge of using Apache Commons Text instead of Commons Lang 3, maybe consider keeping this the way it is? > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664272#comment-16664272 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228325643 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java --- @@ -132,17 +132,25 @@ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); +// Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") -.description("A FlowFile is transferred to this relationship if the operation completed successfully.") +.description("Successfully created FlowFile from CQL query result set.") --- End diff -- This relationship is reused by PutCassandraQL as well, where there is no result set or query per se (it's a statement). That's why the doc is so generic. If you'd like to have the doc be more specific, you can create a REL_SUCCESS relationship in QueryCassandra using `new Relationship.Builder().from(AbstractCassandraProcessor.REL_SUCCESS).description("Your description override").build()` > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664275#comment-16664275 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228328178 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet =
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664274#comment-16664274 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228327507 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet =
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664276#comment-16664276 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228331847 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet =
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664277#comment-16664277 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228332165 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -400,77 +478,87 @@ public static long convertToJsonStream(final ResultSet rs, final OutputStream ou outStream.write("{\"results\":[".getBytes(charset)); final ColumnDefinitions columnDefinitions = rs.getColumnDefinitions(); long nrOfRows = 0; +long rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); + if (columnDefinitions != null) { -do { - -// Grab the ones we have -int rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); -if (rowsAvailableWithoutFetching == 0) { -// Get more -if (timeout <= 0 || timeUnit == null) { -rs.fetchMoreResults().get(); -} else { -rs.fetchMoreResults().get(timeout, timeUnit); -} + +// Grab the ones we have +if (rowsAvailableWithoutFetching == 0) { +// Get more +if (timeout <= 0 || timeUnit == null) { +rs.fetchMoreResults().get(); +} else { +rs.fetchMoreResults().get(timeout, timeUnit); } +rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); +} -for (Row row : rs) { -if (nrOfRows != 0) { +if(maxRowsPerFlowFile == 0){ +maxRowsPerFlowFile = rowsAvailableWithoutFetching; +} +Row row; +while(nrOfRows < maxRowsPerFlowFile){ +try { +row = rs.iterator().next(); +}catch (NoSuchElementException nsee){ +//nrOfRows -= 1; --- End diff -- This is commented out here but active in the Avro version above, I assume they need to be the same? > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655839#comment-16655839 ] André Gomes Lamas Otero commented on NIFI-5642: --- Hi Folks! Is anyone available to review my PR ? Thanks! > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646709#comment-16646709 ] ASF GitHub Bot commented on NIFI-5642: -- Github user aglotero commented on the issue: https://github.com/apache/nifi/pull/3051 Hi @mattyb149! This last commit contains all changes from the last commits, addressing your comments. Regards, > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16643531#comment-16643531 ] ASF GitHub Bot commented on NIFI-5642: -- Github user aglotero commented on the issue: https://github.com/apache/nifi/pull/3051 Thanks for the comments @mattyb149! Your understanding is correct regarding the intent of this change, I'll take a look on QueryDatabaseTable to understand how to add an "Output Batch Size" parameter on QueryCassandra processor. Regards, André > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16643447#comment-16643447 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r223701761 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -192,15 +205,17 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile fileToProcess = null; +FlowFile inputFlowFile = null; if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } +session.remove(inputFlowFile); --- End diff -- I don't follow the logic here. If there is a flow file in the incoming connection, this appears to remove it from the session, and fileToProcess will always be null, which means we couldn't use flow file attributes to evaluate properties such as CQL Query, Query Timeout, Charset, etc. Another effect is that provenance/lineage will not be preserved for incoming files, as the incoming file will be removed, and any flow files generated by this processor will appear to have been created here, so you can't track that a flow file "A" came in and, as a result, generated flow files X,Y,Z. > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16643446#comment-16643446 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r223700156 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -132,6 +131,20 @@ private final static List propertyDescriptors; +// Relationships +public static final Relationship REL_SUCCESS = new Relationship.Builder() --- End diff -- These relationships are available (albeit with more generic descriptions) in the AbstractCassandraProcessor class. If we need more specific documentation then we should remove the common relationships and provide specific documentation/relationships in all Cassandra processors. > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642517#comment-16642517 ] ASF GitHub Bot commented on NIFI-5642: -- GitHub user aglotero opened a pull request: https://github.com/apache/nifi/pull/3051 NIFI-5642: QueryCassandra processor : output FlowFiles as soon fetch_size is reached …size is reached Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [X] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [X] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [X] Has your PR been rebased against the latest commit within the target branch (typically master)? - [X] Is your initial contribution a single, squashed commit? ### For code changes: - [X] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [X] Have you written or updated unit tests to verify your changes? - [X] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [X] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [X] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [X] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aglotero/nifi NIFI-5642 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3051.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3051 commit 584251d8af5069c5150d6cbc9ba3eac9b6f0 Author: aglotero Date: 2018-10-08T21:20:26Z NIFI-5642: QueryCassandra processor : output FlowFiles as soon fetch_size is reached > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642477#comment-16642477 ] ASF GitHub Bot commented on NIFI-5642: -- Github user aglotero closed the pull request at: https://github.com/apache/nifi/pull/3050 > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642468#comment-16642468 ] ASF GitHub Bot commented on NIFI-5642: -- GitHub user aglotero opened a pull request: https://github.com/apache/nifi/pull/3050 NIFI-5642 : QueryCassandra processor : output FlowFiles as soon fetch_size is reached Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [X] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [X] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [X] Has your PR been rebased against the latest commit within the target branch (typically master)? - [X] Is your initial contribution a single, squashed commit? ### For code changes: - [X] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [X] Have you written or updated unit tests to verify your changes? - [X] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aglotero/nifi NIFI-5642 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3050.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3050 commit 76b82e23a16a3ca0b2556d5d3f54140f446c0d9d Author: Andy LoPresto Date: 2018-07-07T05:07:46Z NIFI-5370 removed custom hostname verifier implementation from OkHttpReplicationClient (default handles wildcard certs). This closes #2869. Signed-off-by: Mark Payne commit 333146b3fecef50cc16751cdf2fe7bcddcc7e03d Author: Mark Bean Date: 2018-07-05T19:35:15Z NIFI-5368 controller services validated prior to enabling; referenced controller services must be enabled for referencing component to be valid (mock framework) This closes #2873. Signed-off-by: Mark Payne commit 54bb511e579a00535c976159fa2903385af74f77 Author: Mark Payne Date: 2018-07-09T15:50:06Z NIFI-5377: Addressed issue of infinite recursion when enabling/disabling controller services if there is a recursive loop (i.e., Service A references Service B references Service A). This closes #2847 Signed-off-by: Matt Gilman commit 260bc29e1014a2fd21c3775e27f434756e13e0ee Author: Bryan Bende Date: 2018-06-25T14:36:55Z NIFI-5316 Fixed array handling for Avro that comes from Parquet's Avro reader Signed-off-by: zenfenan commit e09ab9b69a3bd21c09118dd2a2c9b8bd4e091bae Author: Mark Payne Date: 2018-06-29T13:25:57Z NIFI-5361: When submitting many processors to start, calculate the 'timeout timestamp' immediately before calling @OnScheduled method, after the task has been scheduled to run, instead of before the task has a chance to run. This closes #2831 commit cfdb0de8f83c3e7d732cac6a5d18b2f1631d20e5 Author: Mark Payne Date: 2018-07-02T14:00:38Z NIFI-5362: When a processor is terminated and has no more active threads, ensure that we set this.hasActiveThreads = false Signed-off-by: Pierre Villard This closes #2832. commit 7b28b914cd6ca4c5f0d566a8726ca02ebd38f3c2 Author: Peter Toth Date: 2018-06-07T10:13:21Z NIFI-5278: fixes JSON escaping of code Change-Id: I2cb0e6c658d4a0f2aad9c4aab9201a3334ee54df NIFI-5278: adds Apache Commons Text to NOTICE Change-Id: I8185239b0a888c16159b18f13d6682ba350cc766 NIFI-5278: adds tests Change-Id: I9286ac71bc7399e5bdc1e6602609b5e8829db27e NIFI-5278: fixes review findings Change-Id: I292c93dae877cf1cd146f3897b7e132b6afac801 Signed-off-by: Matthew Burgess