[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r190665372 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -51,14 +53,22 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; @Tags({ "mongodb", "read", "get" }) @InputRequirement(Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Creates FlowFiles from documents in MongoDB") +@WritesAttributes( value = { +@WritesAttribute(attribute = "progress.estimate", description = "The estimated total documents that match the query. Written if estimation is enabled."), +@WritesAttribute(attribute = "progress.segment.start", description = "Where the first part of the segment is in the total result set. Written if estimation is enabled."), +@WritesAttribute(attribute = "progress.segment.end", description = "Where the last part of the segment is in the total result set. Written if estimation is enabled."), +@WritesAttribute(attribute = "progress.index", description = "When results are written one-by-one to flowfiles, this is is set to indicate estimated progress. Written if estimation is enabled.") --- End diff -- Done. ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r190658948 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java --- @@ -67,20 +69,23 @@ @Before public void setup() { -runner = TestRunners.newTestRunner(GetMongo.class); +runner = TestRunners.newTestRunner(TestGetMongo.class); runner.setVariable("uri", MONGO_URI); runner.setVariable("db", DB_NAME); runner.setVariable("collection", COLLECTION_NAME); runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); -runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP); +runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.GM_TRUE); runner.setIncomingConnection(false); +runner.setValidateExpressionUsage(true); --- End diff -- Removed. ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r190658775 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java --- @@ -123,6 +124,7 @@ .displayName("Batch Size") .description("The number of elements returned from the server in one batch.") .required(false) +.expressionLanguageSupported(true) --- End diff -- Done. ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r190658748 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java --- @@ -115,6 +115,7 @@ .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") .required(false) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.expressionLanguageSupported(true) --- End diff -- Done. ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r190626960 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java --- @@ -115,6 +115,7 @@ .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") .required(false) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.expressionLanguageSupported(true) --- End diff -- Can you use the scope? ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r190627122 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -51,14 +53,22 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; @Tags({ "mongodb", "read", "get" }) @InputRequirement(Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Creates FlowFiles from documents in MongoDB") +@WritesAttributes( value = { +@WritesAttribute(attribute = "progress.estimate", description = "The estimated total documents that match the query. Written if estimation is enabled."), +@WritesAttribute(attribute = "progress.segment.start", description = "Where the first part of the segment is in the total result set. Written if estimation is enabled."), +@WritesAttribute(attribute = "progress.segment.end", description = "Where the last part of the segment is in the total result set. Written if estimation is enabled."), +@WritesAttribute(attribute = "progress.index", description = "When results are written one-by-one to flowfiles, this is is set to indicate estimated progress. Written if estimation is enabled.") --- End diff -- Typo is still here ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r190629391 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java --- @@ -67,20 +69,23 @@ @Before public void setup() { -runner = TestRunners.newTestRunner(GetMongo.class); +runner = TestRunners.newTestRunner(TestGetMongo.class); runner.setVariable("uri", MONGO_URI); runner.setVariable("db", DB_NAME); runner.setVariable("collection", COLLECTION_NAME); runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); -runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP); +runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.GM_TRUE); runner.setIncomingConnection(false); +runner.setValidateExpressionUsage(true); --- End diff -- I believe that's the default here, probably best to not have this line ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r190627002 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java --- @@ -123,6 +124,7 @@ .displayName("Batch Size") .description("The number of elements returned from the server in one batch.") .required(false) +.expressionLanguageSupported(true) --- End diff -- Can you use the scope? ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r172216534 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -121,34 +136,53 @@ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); -static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() -.name("Batch Size") -.description("The number of elements returned from the server in one batch") +static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() +.name("Fetch Size") --- End diff -- I missed that it was `name` and not `displayName`. Maybe what should be done here is to revert that change, and then have the commits happen either after reach flowfile (when grouped into big flowfiles) or after each batch as defined in that property for the 1:1 result/flowfile option. ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r172198724 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -121,34 +136,53 @@ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); -static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() -.name("Batch Size") -.description("The number of elements returned from the server in one batch") +static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() +.name("Fetch Size") --- End diff -- Changing a property's name will cause it to be recognized as a "different" property. This will cause all existing flows containing GetMongo to become invalid (the "Batch Size" property will show up at the bottom containing the existing value, but the framework will claim the processor is invalid because Batch Size is not a supported property). That's why we use displayName() for the user-friendly name, so we can change it at will. I realize you did not have that luxury here, but we still would have to keep the name("Batch Size") and add displayName("Fetch Size"). This will be confusing in the code (until we change it for real, perhaps in NiFi 2.0?) but can be accompanied by documentation. Also I'm still a little leery of changing the existing property to "fetch" vs "batch", then using "batch" in a different context in the added property. Would like to get some input from others on this as well. ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r172019186 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -129,26 +144,44 @@ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() -.name("results-per-flowfile") -.displayName("Results Per FlowFile") -.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") -.required(false) -.expressionLanguageSupported(true) -.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) -.build(); +.name("results-per-flowfile") +.displayName("Results Per FlowFile") +.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") +.required(false) +.expressionLanguageSupported(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.build(); +static final PropertyDescriptor ESTIMATE_PROGRESS = new PropertyDescriptor.Builder() +.name("estimate-progress") +.displayName("Estimate Progress") +.description("If enabled, a count query will be run first, using the configured query, and attributes will be added to each flowfile showing how far they are into the result set.") +.required(true) +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.allowableValues(GM_TRUE, GM_FALSE) +.defaultValue(GM_FALSE.getValue()) +.build(); +static final PropertyDescriptor PROGRESSIVE_COMMITS = new PropertyDescriptor.Builder() +.name("progressive-commits") +.displayName("Commit After Each Batch") --- End diff -- I spent a little time to do working over the documentation on the properties, and I think I struck the right balance of renaming/redocumenting. As an added benefit, I think I made the Batch Size property (now Fetch Size) much clearer to newbs. ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r171302727 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -129,26 +144,44 @@ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() -.name("results-per-flowfile") -.displayName("Results Per FlowFile") -.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") -.required(false) -.expressionLanguageSupported(true) -.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) -.build(); +.name("results-per-flowfile") +.displayName("Results Per FlowFile") +.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") +.required(false) +.expressionLanguageSupported(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.build(); +static final PropertyDescriptor ESTIMATE_PROGRESS = new PropertyDescriptor.Builder() +.name("estimate-progress") +.displayName("Estimate Progress") +.description("If enabled, a count query will be run first, using the configured query, and attributes will be added to each flowfile showing how far they are into the result set.") +.required(true) +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.allowableValues(GM_TRUE, GM_FALSE) +.defaultValue(GM_FALSE.getValue()) +.build(); +static final PropertyDescriptor PROGRESSIVE_COMMITS = new PropertyDescriptor.Builder() +.name("progressive-commits") +.displayName("Commit After Each Batch") --- End diff -- It works in coordination with the results per flowfile property. The idea is to emulate the ExecuteSQL where after a batch of X number has been built up in the processor and it sends the data to a flowfile, it commits. I'm tempted to change the batch size property's display name to be something like Query Fetch Size. I think Results Per Flowfile is probably even clearer than Batch Size for this. Thoughts? ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r171280456 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -129,26 +144,44 @@ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() -.name("results-per-flowfile") -.displayName("Results Per FlowFile") -.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") -.required(false) -.expressionLanguageSupported(true) -.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) -.build(); +.name("results-per-flowfile") +.displayName("Results Per FlowFile") +.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") +.required(false) +.expressionLanguageSupported(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.build(); +static final PropertyDescriptor ESTIMATE_PROGRESS = new PropertyDescriptor.Builder() +.name("estimate-progress") +.displayName("Estimate Progress") +.description("If enabled, a count query will be run first, using the configured query, and attributes will be added to each flowfile showing how far they are into the result set.") +.required(true) +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.allowableValues(GM_TRUE, GM_FALSE) +.defaultValue(GM_FALSE.getValue()) +.build(); +static final PropertyDescriptor PROGRESSIVE_COMMITS = new PropertyDescriptor.Builder() +.name("progressive-commits") +.displayName("Commit After Each Batch") --- End diff -- I'm a little confused here about the term "batch". It doesn't seem directly related to the Batch Size property (since the latter is kind of a server-side thing, like a JDBC "fetch size"?), and in the code a "batch" seems to refer to the number of files set in Results Per Flowfile. Can you explain a little more about what's going on with the progressive commits? If I have Results per Flowfile set to 100 and Batch Size set to 1000, would I get 10 flow files committed at once as once "batch"? Or is it always one commit per flowfile (if Commit After Each Batch is set)? ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r166320282 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -221,22 +258,33 @@ private void configureMapper(String setting) { } } -private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) { -return ppSetting.equals(YES_PP.getValue()) ? mapper.writerWithDefaultPrettyPrinter() +private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean ppSetting) { +return ppSetting ? mapper.writerWithDefaultPrettyPrinter() : mapper.writer(); } -private void writeBatch(String payload, ProcessContext context, ProcessSession session) { +private void writeBatch(String payload, ProcessContext context, ProcessSession session, boolean doCommit, Long count, long index, int batchSize) { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { out.write(payload.getBytes("UTF-8")); } }); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); +Mapattrs = new HashMap<>(); +attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json"); +if (count != null) { +attrs.put(PROGRESS_START, String.valueOf(index - batchSize)); +attrs.put(PROGRESS_END, String.valueOf(index)); +attrs.put(PROGRESS_ESTIMATE, String.valueOf(count)); +} +flowFile = session.putAllAttributes(flowFile, attrs); session.getProvenanceReporter().receive(flowFile, getURI(context)); + session.transfer(flowFile, REL_SUCCESS); +if (doCommit) { +session.commit(); +} --- End diff -- Yeah, that's what I wanted to understand. Along with a flowfile per batch, it writes the progress (if enabled)? LGTM. +1 ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r166145010 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -221,22 +258,33 @@ private void configureMapper(String setting) { } } -private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) { -return ppSetting.equals(YES_PP.getValue()) ? mapper.writerWithDefaultPrettyPrinter() +private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean ppSetting) { +return ppSetting ? mapper.writerWithDefaultPrettyPrinter() : mapper.writer(); } -private void writeBatch(String payload, ProcessContext context, ProcessSession session) { +private void writeBatch(String payload, ProcessContext context, ProcessSession session, boolean doCommit, Long count, long index, int batchSize) { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { out.write(payload.getBytes("UTF-8")); } }); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); +Mapattrs = new HashMap<>(); +attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json"); +if (count != null) { +attrs.put(PROGRESS_START, String.valueOf(index - batchSize)); +attrs.put(PROGRESS_END, String.valueOf(index)); +attrs.put(PROGRESS_ESTIMATE, String.valueOf(count)); +} +flowFile = session.putAllAttributes(flowFile, attrs); session.getProvenanceReporter().receive(flowFile, getURI(context)); + session.transfer(flowFile, REL_SUCCESS); +if (doCommit) { +session.commit(); +} --- End diff -- The commit has two goals: 1. Make progressive commits a configurable option. 2. Enable people to run a count() on the collection and put progress markers on each flowfile. > the current version also transfers a flowfile for every batch, right? It can do 1 result/flowfile or batches of results in one flowfile. The TL;DR version of why I did this is I have a client who sometimes pulls down 100GB+ at once from Mongo and without these features would have to check the size of the content repository to get an approximation of what percentage has been downloaded. ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r165991021 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -51,14 +53,22 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; @Tags({ "mongodb", "read", "get" }) @InputRequirement(Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Creates FlowFiles from documents in MongoDB") +@WritesAttributes( value = { +@WritesAttribute(attribute = "progress.estimate", description = "The estimated total documents that match the query. Written if estimation is enabled."), +@WritesAttribute(attribute = "progress.segment.start", description = "Where the first part of the segment is in the total result set. Written if estimation is enabled."), +@WritesAttribute(attribute = "progress.segment.end", description = "Where the last part of the segment is in the total result set. Written if estimation is enabled."), +@WritesAttribute(attribute = "progress.index", description = "When results are written one-by-one to flowfiles, this is is set to indicate estimated progress. Written if estimation is enabled.") --- End diff -- Typo: this is is set ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r165990218 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -126,18 +142,37 @@ public ValidationResult validate(final String subject, final String value, final .expressionLanguageSupported(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); +static final PropertyDescriptor ESTIMATE_PROGRESS = new PropertyDescriptor.Builder() +.name("estimate-progress") +.displayName("Estimate Progress") +.description("If enabled, a count query will be run first, using the configured query, and attributes will be added to each flowfile showing how far they are into the result set.") +.required(true) +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.allowableValues(GM_TRUE, GM_FALSE) +.defaultValue(GM_FALSE.getValue()) +.build(); +static final PropertyDescriptor PROGRESSIVE_COMMITS = new PropertyDescriptor.Builder() +.name("progressive-commits") +.displayName("Commit After Each Batch") +.description("If the result set is broken up into multiple flowfiles, this option can be used to commit after each flowfile is writte. " + --- End diff -- Typo: after each flowfile is *written.* ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2448#discussion_r165994104 --- Diff: nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java --- @@ -221,22 +258,33 @@ private void configureMapper(String setting) { } } -private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) { -return ppSetting.equals(YES_PP.getValue()) ? mapper.writerWithDefaultPrettyPrinter() +private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean ppSetting) { +return ppSetting ? mapper.writerWithDefaultPrettyPrinter() : mapper.writer(); } -private void writeBatch(String payload, ProcessContext context, ProcessSession session) { +private void writeBatch(String payload, ProcessContext context, ProcessSession session, boolean doCommit, Long count, long index, int batchSize) { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { out.write(payload.getBytes("UTF-8")); } }); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); +Mapattrs = new HashMap<>(); +attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json"); +if (count != null) { +attrs.put(PROGRESS_START, String.valueOf(index - batchSize)); +attrs.put(PROGRESS_END, String.valueOf(index)); +attrs.put(PROGRESS_ESTIMATE, String.valueOf(count)); +} +flowFile = session.putAllAttributes(flowFile, attrs); session.getProvenanceReporter().receive(flowFile, getURI(context)); + session.transfer(flowFile, REL_SUCCESS); +if (doCommit) { +session.commit(); +} --- End diff -- Is the motivation behind this change is to add these progress attributes to the flowfiles and do commit for every batch? The reason I'm asking is because the associated JIRA mentions the current GetMongo gives the perception "that it has hung to a user who is pulling a very large data set" but the current version also transfers a flowfile for every batch, right? If the notion of that JIRA is to add the progress to the flowfiles and do commits for every batch then it looks good to me. ---
[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...
GitHub user MikeThomsen opened a pull request: https://github.com/apache/nifi/pull/2448 NIFI-4838 Added configurable progressive commits to GetMongo and also⦠⦠added attributes that show the progress in the result set that each flowfile represents. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MikeThomsen/nifi NIFI-4838 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2448.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2448 commit 765720f7d087ec8b45ad6888f93910f8f6fdad45 Author: Mike ThomsenDate: 2018-02-04T21:51:47Z NIFI-4838 Added configurable progressive commits to GetMongo and also added attributes that show the progress in the result set that each flowfile represents. ---