[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-05-24 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r190665372
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
@@ -51,14 +53,22 @@
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 
 @Tags({ "mongodb", "read", "get" })
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Creates FlowFiles from documents in MongoDB")
+@WritesAttributes( value = {
+@WritesAttribute(attribute = "progress.estimate", description = "The 
estimated total documents that match the query. Written if estimation is 
enabled."),
+@WritesAttribute(attribute = "progress.segment.start", description = 
"Where the first part of the segment is in the total result set. Written if 
estimation is enabled."),
+@WritesAttribute(attribute = "progress.segment.end", description = 
"Where the last part of the segment is in the total result set. Written if 
estimation is enabled."),
+@WritesAttribute(attribute = "progress.index", description = "When 
results are written one-by-one to flowfiles, this is is set to indicate 
estimated progress. Written if estimation is enabled.")
--- End diff --

Done.


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-05-24 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r190658948
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
 ---
@@ -67,20 +69,23 @@
 
 @Before
 public void setup() {
-runner = TestRunners.newTestRunner(GetMongo.class);
+runner = TestRunners.newTestRunner(TestGetMongo.class);
 runner.setVariable("uri", MONGO_URI);
 runner.setVariable("db", DB_NAME);
 runner.setVariable("collection", COLLECTION_NAME);
 runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
 runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
 runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, 
"${collection}");
-runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP);
+runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.GM_TRUE);
 runner.setIncomingConnection(false);
+runner.setValidateExpressionUsage(true);
--- End diff --

Removed.


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-05-24 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r190658775
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
 ---
@@ -123,6 +124,7 @@
 .displayName("Batch Size")
 .description("The number of elements returned from the server 
in one batch.")
 .required(false)
+.expressionLanguageSupported(true)
--- End diff --

Done.


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-05-24 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r190658748
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
 ---
@@ -115,6 +115,7 @@
 .description("How many results to put into a flowfile at once. 
The whole body will be treated as a JSON array of results.")
 .required(false)
 .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.expressionLanguageSupported(true)
--- End diff --

Done.


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-05-24 Thread pvillard31
Github user pvillard31 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r190626960
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
 ---
@@ -115,6 +115,7 @@
 .description("How many results to put into a flowfile at once. 
The whole body will be treated as a JSON array of results.")
 .required(false)
 .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.expressionLanguageSupported(true)
--- End diff --

Can you use the scope?


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-05-24 Thread pvillard31
Github user pvillard31 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r190627122
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
@@ -51,14 +53,22 @@
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 
 @Tags({ "mongodb", "read", "get" })
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Creates FlowFiles from documents in MongoDB")
+@WritesAttributes( value = {
+@WritesAttribute(attribute = "progress.estimate", description = "The 
estimated total documents that match the query. Written if estimation is 
enabled."),
+@WritesAttribute(attribute = "progress.segment.start", description = 
"Where the first part of the segment is in the total result set. Written if 
estimation is enabled."),
+@WritesAttribute(attribute = "progress.segment.end", description = 
"Where the last part of the segment is in the total result set. Written if 
estimation is enabled."),
+@WritesAttribute(attribute = "progress.index", description = "When 
results are written one-by-one to flowfiles, this is is set to indicate 
estimated progress. Written if estimation is enabled.")
--- End diff --

Typo is still here


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-05-24 Thread pvillard31
Github user pvillard31 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r190629391
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
 ---
@@ -67,20 +69,23 @@
 
 @Before
 public void setup() {
-runner = TestRunners.newTestRunner(GetMongo.class);
+runner = TestRunners.newTestRunner(TestGetMongo.class);
 runner.setVariable("uri", MONGO_URI);
 runner.setVariable("db", DB_NAME);
 runner.setVariable("collection", COLLECTION_NAME);
 runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
 runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
 runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, 
"${collection}");
-runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP);
+runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.GM_TRUE);
 runner.setIncomingConnection(false);
+runner.setValidateExpressionUsage(true);
--- End diff --

I believe that's the default here, probably best to not have this line


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-05-24 Thread pvillard31
Github user pvillard31 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r190627002
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
 ---
@@ -123,6 +124,7 @@
 .displayName("Batch Size")
 .description("The number of elements returned from the server 
in one batch.")
 .required(false)
+.expressionLanguageSupported(true)
--- End diff --

Can you use the scope?


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-03-05 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r172216534
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
@@ -121,34 +136,53 @@
 .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
 .build();
 
-static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
-.name("Batch Size")
-.description("The number of elements returned from the server 
in one batch")
+static final PropertyDescriptor FETCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("Fetch Size")
--- End diff --

I missed that it was `name` and not `displayName`.

Maybe what should be done here is to revert that change, and then have the 
commits happen either after reach flowfile (when grouped into big flowfiles) or 
after each batch as defined in that property for the 1:1 result/flowfile option.


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-03-05 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r172198724
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
@@ -121,34 +136,53 @@
 .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
 .build();
 
-static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
-.name("Batch Size")
-.description("The number of elements returned from the server 
in one batch")
+static final PropertyDescriptor FETCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("Fetch Size")
--- End diff --

Changing a property's name will cause it to be recognized as a "different" 
property. This will cause all existing flows containing GetMongo to become 
invalid (the "Batch Size" property will show up at the bottom containing the 
existing value, but the framework will claim the processor is invalid because 
Batch Size is not a supported property).
That's why we use displayName() for the user-friendly name, so we can 
change it at will. I realize you did not have that luxury here, but we still 
would have to keep the name("Batch Size") and add displayName("Fetch Size"). 
This will be confusing in the code (until we change it for real, perhaps in 
NiFi 2.0?) but can be accompanied by documentation.
Also I'm still a little leery of changing the existing property to "fetch" 
vs "batch", then using "batch" in a different context in the added property. 
Would like to get some input from others on this as well.


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-03-03 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r172019186
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
@@ -129,26 +144,44 @@
 .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
 .build();
 static final PropertyDescriptor RESULTS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
-.name("results-per-flowfile")
-.displayName("Results Per FlowFile")
-.description("How many results to put into a flowfile at once. 
The whole body will be treated as a JSON array of results.")
-.required(false)
-.expressionLanguageSupported(true)
-.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-.build();
+.name("results-per-flowfile")
+.displayName("Results Per FlowFile")
+.description("How many results to put into a flowfile at once. The 
whole body will be treated as a JSON array of results.")
+.required(false)
+.expressionLanguageSupported(true)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.build();
+static final PropertyDescriptor ESTIMATE_PROGRESS = new 
PropertyDescriptor.Builder()
+.name("estimate-progress")
+.displayName("Estimate Progress")
+.description("If enabled, a count query will be run first, using 
the configured query, and attributes will be added to each flowfile showing how 
far they are into the result set.")
+.required(true)
+.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+.allowableValues(GM_TRUE, GM_FALSE)
+.defaultValue(GM_FALSE.getValue())
+.build();
+static final PropertyDescriptor PROGRESSIVE_COMMITS = new 
PropertyDescriptor.Builder()
+.name("progressive-commits")
+.displayName("Commit After Each Batch")
--- End diff --

I spent a little time to do working over the documentation on the 
properties, and I think I struck the right balance of renaming/redocumenting. 
As an added benefit, I think I made the Batch Size property (now Fetch Size) 
much clearer to newbs.


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-02-28 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r171302727
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
@@ -129,26 +144,44 @@
 .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
 .build();
 static final PropertyDescriptor RESULTS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
-.name("results-per-flowfile")
-.displayName("Results Per FlowFile")
-.description("How many results to put into a flowfile at once. 
The whole body will be treated as a JSON array of results.")
-.required(false)
-.expressionLanguageSupported(true)
-.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-.build();
+.name("results-per-flowfile")
+.displayName("Results Per FlowFile")
+.description("How many results to put into a flowfile at once. The 
whole body will be treated as a JSON array of results.")
+.required(false)
+.expressionLanguageSupported(true)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.build();
+static final PropertyDescriptor ESTIMATE_PROGRESS = new 
PropertyDescriptor.Builder()
+.name("estimate-progress")
+.displayName("Estimate Progress")
+.description("If enabled, a count query will be run first, using 
the configured query, and attributes will be added to each flowfile showing how 
far they are into the result set.")
+.required(true)
+.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+.allowableValues(GM_TRUE, GM_FALSE)
+.defaultValue(GM_FALSE.getValue())
+.build();
+static final PropertyDescriptor PROGRESSIVE_COMMITS = new 
PropertyDescriptor.Builder()
+.name("progressive-commits")
+.displayName("Commit After Each Batch")
--- End diff --

It works in coordination with the results per flowfile property. The idea 
is to emulate the ExecuteSQL where after a batch of X number has been built up 
in the processor and it sends the data to a flowfile, it commits.

I'm tempted to change the batch size property's display name to be 
something like Query Fetch Size. I think Results Per Flowfile is probably even 
clearer than Batch Size for this. Thoughts?


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-02-28 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r171280456
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
@@ -129,26 +144,44 @@
 .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
 .build();
 static final PropertyDescriptor RESULTS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
-.name("results-per-flowfile")
-.displayName("Results Per FlowFile")
-.description("How many results to put into a flowfile at once. 
The whole body will be treated as a JSON array of results.")
-.required(false)
-.expressionLanguageSupported(true)
-.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-.build();
+.name("results-per-flowfile")
+.displayName("Results Per FlowFile")
+.description("How many results to put into a flowfile at once. The 
whole body will be treated as a JSON array of results.")
+.required(false)
+.expressionLanguageSupported(true)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.build();
+static final PropertyDescriptor ESTIMATE_PROGRESS = new 
PropertyDescriptor.Builder()
+.name("estimate-progress")
+.displayName("Estimate Progress")
+.description("If enabled, a count query will be run first, using 
the configured query, and attributes will be added to each flowfile showing how 
far they are into the result set.")
+.required(true)
+.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+.allowableValues(GM_TRUE, GM_FALSE)
+.defaultValue(GM_FALSE.getValue())
+.build();
+static final PropertyDescriptor PROGRESSIVE_COMMITS = new 
PropertyDescriptor.Builder()
+.name("progressive-commits")
+.displayName("Commit After Each Batch")
--- End diff --

I'm a little confused here about the term "batch". It doesn't seem directly 
related to the Batch Size property (since the latter is kind of a server-side 
thing, like a JDBC "fetch size"?), and in the code a "batch" seems to refer to 
the number of files set in Results Per Flowfile.
Can you explain a little more about what's going on with the progressive 
commits? If I have Results per Flowfile set to 100 and Batch Size set to 1000, 
would I get 10 flow files committed at once as once "batch"? Or is it always 
one commit per flowfile (if Commit After Each Batch is set)?


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-02-06 Thread zenfenan
Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r166320282
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
@@ -221,22 +258,33 @@ private void configureMapper(String setting) {
 }
 }
 
-private ObjectWriter getObjectWriter(ObjectMapper mapper, String 
ppSetting) {
-return ppSetting.equals(YES_PP.getValue()) ? 
mapper.writerWithDefaultPrettyPrinter()
+private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean 
ppSetting) {
+return ppSetting ? mapper.writerWithDefaultPrettyPrinter()
 : mapper.writer();
 }
 
-private void writeBatch(String payload, ProcessContext context, 
ProcessSession session) {
+private void writeBatch(String payload, ProcessContext context, 
ProcessSession session, boolean doCommit, Long count, long index, int 
batchSize) {
 FlowFile flowFile = session.create();
 flowFile = session.write(flowFile, new OutputStreamCallback() {
 @Override
 public void process(OutputStream out) throws IOException {
 out.write(payload.getBytes("UTF-8"));
 }
 });
-flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
+Map attrs = new HashMap<>();
+attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+if (count != null) {
+attrs.put(PROGRESS_START, String.valueOf(index - batchSize));
+attrs.put(PROGRESS_END, String.valueOf(index));
+attrs.put(PROGRESS_ESTIMATE, String.valueOf(count));
+}
+flowFile = session.putAllAttributes(flowFile, attrs);
 session.getProvenanceReporter().receive(flowFile, getURI(context));
+
 session.transfer(flowFile, REL_SUCCESS);
+if (doCommit) {
+session.commit();
+}
--- End diff --

Yeah, that's what I wanted to understand. Along with a flowfile per batch, 
it writes the progress (if enabled)? LGTM. +1


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-02-05 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r166145010
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
@@ -221,22 +258,33 @@ private void configureMapper(String setting) {
 }
 }
 
-private ObjectWriter getObjectWriter(ObjectMapper mapper, String 
ppSetting) {
-return ppSetting.equals(YES_PP.getValue()) ? 
mapper.writerWithDefaultPrettyPrinter()
+private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean 
ppSetting) {
+return ppSetting ? mapper.writerWithDefaultPrettyPrinter()
 : mapper.writer();
 }
 
-private void writeBatch(String payload, ProcessContext context, 
ProcessSession session) {
+private void writeBatch(String payload, ProcessContext context, 
ProcessSession session, boolean doCommit, Long count, long index, int 
batchSize) {
 FlowFile flowFile = session.create();
 flowFile = session.write(flowFile, new OutputStreamCallback() {
 @Override
 public void process(OutputStream out) throws IOException {
 out.write(payload.getBytes("UTF-8"));
 }
 });
-flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
+Map attrs = new HashMap<>();
+attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+if (count != null) {
+attrs.put(PROGRESS_START, String.valueOf(index - batchSize));
+attrs.put(PROGRESS_END, String.valueOf(index));
+attrs.put(PROGRESS_ESTIMATE, String.valueOf(count));
+}
+flowFile = session.putAllAttributes(flowFile, attrs);
 session.getProvenanceReporter().receive(flowFile, getURI(context));
+
 session.transfer(flowFile, REL_SUCCESS);
+if (doCommit) {
+session.commit();
+}
--- End diff --

The commit has two goals:

1. Make progressive commits a configurable option.
2. Enable people to run a count() on the collection and put progress 
markers on each flowfile.

> the current version also transfers a flowfile for every batch, right?

It can do 1 result/flowfile or batches of results in one flowfile.

The TL;DR version of why I did this is I have a client who sometimes pulls 
down 100GB+ at once from Mongo and without these features would have to check 
the size of the content repository to get an approximation of what percentage 
has been downloaded.


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-02-05 Thread zenfenan
Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r165991021
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
@@ -51,14 +53,22 @@
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 
 @Tags({ "mongodb", "read", "get" })
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Creates FlowFiles from documents in MongoDB")
+@WritesAttributes( value = {
+@WritesAttribute(attribute = "progress.estimate", description = "The 
estimated total documents that match the query. Written if estimation is 
enabled."),
+@WritesAttribute(attribute = "progress.segment.start", description = 
"Where the first part of the segment is in the total result set. Written if 
estimation is enabled."),
+@WritesAttribute(attribute = "progress.segment.end", description = 
"Where the last part of the segment is in the total result set. Written if 
estimation is enabled."),
+@WritesAttribute(attribute = "progress.index", description = "When 
results are written one-by-one to flowfiles, this is is set to indicate 
estimated progress. Written if estimation is enabled.")
--- End diff --

Typo: this is is set


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-02-05 Thread zenfenan
Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r165990218
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
@@ -126,18 +142,37 @@ public ValidationResult validate(final String 
subject, final String value, final
 .expressionLanguageSupported(true)
 .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
 .build();
+static final PropertyDescriptor ESTIMATE_PROGRESS = new 
PropertyDescriptor.Builder()
+.name("estimate-progress")
+.displayName("Estimate Progress")
+.description("If enabled, a count query will be run first, using 
the configured query, and attributes will be added to each flowfile showing how 
far they are into the result set.")
+.required(true)
+.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+.allowableValues(GM_TRUE, GM_FALSE)
+.defaultValue(GM_FALSE.getValue())
+.build();
+static final PropertyDescriptor PROGRESSIVE_COMMITS = new 
PropertyDescriptor.Builder()
+.name("progressive-commits")
+.displayName("Commit After Each Batch")
+.description("If the result set is broken up into multiple 
flowfiles, this option can be used to commit after each flowfile is writte. " +
--- End diff --

Typo: after each flowfile is *written.* 


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-02-05 Thread zenfenan
Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2448#discussion_r165994104
  
--- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
@@ -221,22 +258,33 @@ private void configureMapper(String setting) {
 }
 }
 
-private ObjectWriter getObjectWriter(ObjectMapper mapper, String 
ppSetting) {
-return ppSetting.equals(YES_PP.getValue()) ? 
mapper.writerWithDefaultPrettyPrinter()
+private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean 
ppSetting) {
+return ppSetting ? mapper.writerWithDefaultPrettyPrinter()
 : mapper.writer();
 }
 
-private void writeBatch(String payload, ProcessContext context, 
ProcessSession session) {
+private void writeBatch(String payload, ProcessContext context, 
ProcessSession session, boolean doCommit, Long count, long index, int 
batchSize) {
 FlowFile flowFile = session.create();
 flowFile = session.write(flowFile, new OutputStreamCallback() {
 @Override
 public void process(OutputStream out) throws IOException {
 out.write(payload.getBytes("UTF-8"));
 }
 });
-flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
+Map attrs = new HashMap<>();
+attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+if (count != null) {
+attrs.put(PROGRESS_START, String.valueOf(index - batchSize));
+attrs.put(PROGRESS_END, String.valueOf(index));
+attrs.put(PROGRESS_ESTIMATE, String.valueOf(count));
+}
+flowFile = session.putAllAttributes(flowFile, attrs);
 session.getProvenanceReporter().receive(flowFile, getURI(context));
+
 session.transfer(flowFile, REL_SUCCESS);
+if (doCommit) {
+session.commit();
+}
--- End diff --

Is the motivation behind this change is to add these progress attributes to 
the flowfiles and do commit for every batch? The reason I'm asking is because 
the associated JIRA mentions the current GetMongo gives the perception "that it 
has hung to a user who is pulling a very large data set" but the current 
version also transfers a flowfile for every batch, right? If the notion of that 
JIRA is to add the progress to the flowfiles and do commits for every batch 
then it looks good to me. 


---


[GitHub] nifi pull request #2448: NIFI-4838 Added configurable progressive commits to...

2018-02-04 Thread MikeThomsen
GitHub user MikeThomsen opened a pull request:

https://github.com/apache/nifi/pull/2448

NIFI-4838 Added configurable progressive commits to GetMongo and also…

… added attributes that show the progress in the result set that each 
flowfile represents.

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ ] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MikeThomsen/nifi NIFI-4838

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2448.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2448


commit 765720f7d087ec8b45ad6888f93910f8f6fdad45
Author: Mike Thomsen 
Date:   2018-02-04T21:51:47Z

NIFI-4838 Added configurable progressive commits to GetMongo and also added 
attributes that show the progress in the result set that each flowfile 
represents.




---