Thanks Chris. That’s exactly right. Given that you’re seeing the Max Bin Age is the cause, the solution would be to increase the max bin age if you want fewer FlowFiles.
The data is merged when any one of the following conditions is met: - Minimum Number of Records is reached AND Minimum Bin Size is reached OR - Maximum Number of Records is reached OR Maximum Bin Size is reached OR - Max Bin Age is reached OR - Maximum Number of Bins is reached AND a new FlowFile is encountered that belongs in a different bin than any of the existing ones (only valid if using a Correlation Attribute). So in your case, you’re not hitting the minimum number of records, but you are hitting the Max Bin Age so it’s merging. The idea behind Max Bin Age is that it’s basically a timeout. It prevents data from stacking up for too long, introducing too large of a latency. Now, that said, what you’re after is really not something that’s as easily supported by this Processor. Becuase you’re not really looking to pack together Records in order to build a larger bundle. You’re looking to pack together records in order to re-join specific sets of Records. So you might actually want to consider using MergeContent instead of MergeRecord. Assuming that your data is in JSON format, you can use MergeRecord’s header/footer/demarcator properties to ensure that you still have valid JSON. But with MergeRecord you specify min/max based on number of FlowFiles, not number of Records. So you can set Minimum Entries to 3 (assuming you have 3 nodes in your cluster). So that’ll wait for 3 FlowFiles. Presumably one from each node.And set a Max Bin Age short enough that even if a node doesn’t send because the node is stopped, you still merge data from the other 2 nodes or whatever. Thanks -Mark On Aug 31, 2022, at 7:45 AM, Chris Sampson <[email protected]<mailto:[email protected]>> wrote: For “Minimum Number of Records”, the docs [1] indicate that the field does support Expression Language but "will be evaluated using variable registry only”, i.e. it doesn’t use FlowFile attributes, which it appears you’re trying to do in your example within this email chain. If you provenance is showing that "Records Merged due to: Bin has reached Max Bin Age”, wouldn’t it be a good idea to increase the “Max Bin Age” from the “10s” you indicate in your original email? If you set this to, say, “5mins”, do you see the number of resultant FlowFiles reduce with more input Records included within each output FlowFile? Basically, your provenance seems to suggest that you need to allow a longer period of time for your data to reach the MergeRecord processor and be combined. My understanding from a quick look at the processor’s “Additional Details” [2] (see section “When a Bin is Merged”) is that the Bin will be merged & output once the “Max Bin Age” (if configured) is reached irregardless of whether the “Minimum Number of Records” has been reached. Likewise, I’d expect that the merged output would happen if “Maximum Number of Records” is reached irrespective of any “Max Bin Age” settings. Caveat: I don’t really use MergeRecord [1]: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.MergeRecord/index.html [2]: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html On 31 Aug 2022, at 09:22, Jens M. Kofoed <[email protected]<mailto:[email protected]>> wrote: Hi By degreasing the batch size for the SiteToSiteStatusReportingTask I get even more flowfiles. So just for testing I now have total of 9 files (2.75MB) in the incomming queue to the mergeRecord. The total number of records above 2000, so I have set the "Minimum Number of Records" to 1500 and the "Minimum Bin Size" to 2 MB. The result are 3 flowfiles which are all have "Records Merged due to: Bin has reached Max Bin Age"???? Why? All 9 files should be merged into one file, since the total amount of records exceeds the minimum, Kind regards Jens M. Kofoed Den ons. 31. aug. 2022 kl. 09.50 skrev Jens M. Kofoed <[email protected]<mailto:[email protected]>>: Hey Mark I tried another idea to dynamically set the "Minimum Number of Records" by EL. Editing the field it says that EL is supported, so I tried this: ${record.count:minus(1):multiply(3)} But the processor does not like this: Perform Validation nifi.mydomain.com:8443<http://nifi.mydomain.com:8443/> - Component is invalid: 'Component' is invalid because Failed to perform validation due to java.lang.NumberFormatException: For input string: "" I got the same error if I just tried to set the EL to: ${record.count} Is this a bug??? Kind regards Jens Den ons. 31. aug. 2022 kl. 09.24 skrev Jens M. Kofoed <[email protected]<mailto:[email protected]>>: Hey Mark Many thanks for your reply. But it's in fact the Details field which does not help me. At 08:16:00 all 3 nodes generate a SiteToSiteStatusReport. At 08:16:11.003 the MergeRecords have a JOIN event. Joining 2 files: "Records Merged due to: Bin has reached Max Bin Age" At 08:16:11.008 the MergeRecords have another JOIN event. Joining 1 file: "Records Merged due to: Bin has reached Max Bin Age" So one file is 0.005s younger than the other 2 files and therefore is not merged into the first bin of files. But how can we force all flowfiles to be merged into one flowfile? If I set the minimum file size or records to be within range of the >2 files and <3 files, it will trigger a merge. But when we create more flows the records and filesize will increase and we will be back to the problem that not all files will be merged into one. kind regards Jens Den tir. 30. aug. 2022 kl. 15.40 skrev Mark Payne <[email protected]<mailto:[email protected]>>: Hey Jens, My recommendation is to take a look at the data provenance for MergeRecord (i.e., right-click on the Processor and go to Data Provenance.) Click the little ‘i’ icon on the left for one of the JOIN events. There, it will show a “Details” field, which will tell you why it merged the data in the bin. Once you understand why it’s merging the data with only 2 FlowFiles, you should be to understand how to adjust your configuration to avoid doing that. Thanks -Mark > On Aug 30, 2022, at 2:31 AM, Jens M. Kofoed > <[email protected]<mailto:jmkofoed.ube%[email protected]>> wrote: > > Hi all > > I'm running a 3 node cluster at version 1.16.2. I'm using the > SiteToSiteStatusReportingTask to monitor and check for any backpressures or > queues. I'm trying to merge all 3 reports into 1, but must of the times I > always get 2 flowfile after my MergeRecord. > > To be sure the nodes are creating the reports at the same time the > SiteToSiteStatusReportingTask is set to schedule via CRON driver every 5 mins. > The connection from the input port to the next process is set with "Load > Balance Strategy" to Single node, to be sure all 3 reports are at the same > node. > In my MergeRecord the "Correlation Attribute Name" is set to > "reporting.task.uuid" which is the same for all 3 flowfiles. > "Minimum Number of Records" is set to 5000, which is much higher than the > total amounts of records. > "Minimum Bin Size" is set to 5 MB, which is also much higher than the total > size. Maximum "Number of Bins" is at default: 10 > "Max Bin Age" is set to 10 s. > > With these setting I was hoping that all 3 reports, should be at the same > node within a few seconds. And that the mergeRecods will merge all 3 > flowfiles into 1. But many time the mergeRecord outputs 2 flowfiles. > > Any ideas how to force all into one flowfile. > > Kind regards > Jens M. Kofoed
