Thanks Chris. That’s exactly right.

Given that you’re seeing the Max Bin Age is the cause, the solution would be to 
increase the max bin age if you want fewer FlowFiles.

The data is merged when any one of the following conditions is met:

- Minimum Number of Records is reached AND Minimum Bin Size is reached
OR
- Maximum Number of Records is reached OR Maximum Bin Size is reached
OR
- Max Bin Age is reached
OR
- Maximum Number of Bins is reached AND a new FlowFile is encountered that 
belongs in a different bin than any of the existing ones (only valid if using a 
Correlation Attribute).

So in your case, you’re not hitting the minimum number of records, but you are 
hitting the Max Bin Age so it’s merging.
The idea behind Max Bin Age is that it’s basically a timeout. It prevents data 
from stacking up for too long, introducing too large of a latency.

Now, that said, what you’re after is really not something that’s as easily 
supported by this Processor. Becuase you’re not really looking to pack together 
Records in order to build a larger bundle. You’re looking to pack together 
records in order to re-join specific sets of Records. So you might actually 
want to consider using MergeContent instead of MergeRecord. Assuming that your 
data is in JSON format, you can use MergeRecord’s header/footer/demarcator 
properties to ensure that you still have valid JSON. But with MergeRecord you 
specify min/max based on number of FlowFiles, not number of Records. So you can 
set Minimum Entries to 3 (assuming you have 3 nodes in your cluster). So 
that’ll wait for 3 FlowFiles. Presumably one from each node.And set a Max Bin 
Age short enough that even if a node doesn’t send because the node is stopped, 
you still merge data from the other 2 nodes or whatever.

Thanks
-Mark



On Aug 31, 2022, at 7:45 AM, Chris Sampson 
<[email protected]<mailto:[email protected]>> wrote:

For “Minimum Number of Records”, the docs [1] indicate that the field does 
support Expression Language but "will be evaluated using variable registry 
only”, i.e. it doesn’t use FlowFile attributes, which it appears you’re trying 
to do in your example within this email chain.

If you provenance is showing that "Records Merged due to: Bin has reached Max 
Bin Age”, wouldn’t it be a good idea to increase the “Max Bin Age” from the 
“10s” you indicate in your original email? If you set this to, say, “5mins”, do 
you see the number of resultant FlowFiles reduce with more input Records 
included within each output FlowFile?

Basically, your provenance seems to suggest that you need to allow a longer 
period of time for your data to reach the MergeRecord processor and be 
combined. My understanding from a quick look at the processor’s “Additional 
Details” [2] (see section “When a Bin is Merged”) is that the Bin will be 
merged & output once the “Max Bin Age” (if configured) is reached irregardless 
of whether the “Minimum Number of Records” has been reached. Likewise, I’d 
expect that the merged output would happen if “Maximum Number of Records” is 
reached irrespective of any “Max Bin Age” settings.


Caveat: I don’t really use MergeRecord

[1]: 
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.MergeRecord/index.html
[2]: 
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html


On 31 Aug 2022, at 09:22, Jens M. Kofoed 
<[email protected]<mailto:[email protected]>> wrote:

Hi
By degreasing the batch size for the SiteToSiteStatusReportingTask I get even 
more flowfiles. So just for testing I now have total of 9 files (2.75MB) in the 
incomming queue to the mergeRecord.
The total number of records above 2000, so I have set the "Minimum Number of 
Records" to 1500 and the "Minimum Bin Size" to 2 MB.
The result are 3 flowfiles which are all have "Records Merged due to: Bin has 
reached Max Bin Age"???? Why?
All 9 files should be merged into one file, since the total amount of records 
exceeds the minimum,

Kind regards
Jens M. Kofoed

Den ons. 31. aug. 2022 kl. 09.50 skrev Jens M. Kofoed 
<[email protected]<mailto:[email protected]>>:
Hey Mark

I tried another idea to dynamically set the "Minimum Number of Records" by EL. 
Editing the field it says that EL is supported, so I tried this:
${record.count:minus(1):multiply(3)}

But the processor does not like this:
Perform Validation
nifi.mydomain.com:8443<http://nifi.mydomain.com:8443/> - Component is invalid: 
'Component' is invalid because Failed to perform validation due to 
java.lang.NumberFormatException: For input string: ""

I got the same error if I just tried to set the EL to: ${record.count}

Is this a bug???

Kind regards
Jens


Den ons. 31. aug. 2022 kl. 09.24 skrev Jens M. Kofoed 
<[email protected]<mailto:[email protected]>>:
Hey Mark

Many thanks for your reply. But it's in fact the Details field which does not 
help me.
At 08:16:00 all 3 nodes generate a SiteToSiteStatusReport.
At 08:16:11.003 the MergeRecords have a JOIN event. Joining 2 files: "Records 
Merged due to: Bin has reached Max Bin Age"
At 08:16:11.008 the MergeRecords have another JOIN event. Joining 1 file: 
"Records Merged due to: Bin has reached Max Bin Age"

So one file is 0.005s younger than the other 2 files and therefore is not 
merged into the first bin of files. But how can we force all flowfiles to be 
merged into one flowfile?
If I set the minimum file size or records to be within range of the >2 files 
and <3 files, it will trigger a merge. But when we create more flows the 
records and filesize will increase and we will be back to the problem that not 
all files will be merged into one.

kind regards
Jens

Den tir. 30. aug. 2022 kl. 15.40 skrev Mark Payne 
<[email protected]<mailto:[email protected]>>:
Hey Jens,

My recommendation is to take a look at the data provenance for MergeRecord 
(i.e., right-click on the Processor and go to Data Provenance.) Click the 
little ‘i’ icon on the left for one of the JOIN events.
There, it will show a “Details” field, which will tell you why it merged the 
data in the bin.
Once you understand why it’s merging the data with only 2 FlowFiles, you should 
be to understand how to adjust your configuration to avoid doing that.

Thanks
-Mark


> On Aug 30, 2022, at 2:31 AM, Jens M. Kofoed 
> <[email protected]<mailto:jmkofoed.ube%[email protected]>> wrote:
>
> Hi all
>
> I'm running a 3 node cluster at version 1.16.2. I'm using the 
> SiteToSiteStatusReportingTask to monitor and check for any backpressures or 
> queues. I'm trying to merge all 3 reports into 1, but must of the times I 
> always get 2 flowfile after my MergeRecord.
>
> To be sure the nodes are creating the reports at the same time the 
> SiteToSiteStatusReportingTask is set to schedule via CRON driver every 5 mins.
> The connection from the input port to the next process is set with "Load 
> Balance Strategy" to Single node, to be sure all 3 reports are at the same 
> node.
> In my MergeRecord the "Correlation Attribute Name" is set to 
> "reporting.task.uuid" which is the same for all 3 flowfiles.
> "Minimum Number of Records" is set to 5000, which is much higher than the 
> total amounts of records.
> "Minimum Bin Size" is set to 5 MB, which is also much higher than the total 
> size. Maximum "Number of Bins" is at default: 10
> "Max Bin Age" is set to 10 s.
>
> With these setting I was hoping that all 3 reports, should be at the same 
> node within a few seconds. And that the mergeRecods will merge all 3 
> flowfiles into 1. But many time the mergeRecord outputs 2 flowfiles.
>
> Any ideas how to force all into one flowfile.
>
> Kind regards
> Jens M. Kofoed



Reply via email to