Tim Yeah so I think you want to set it like the following roughly
Merge Strategy: Bin-Packing Algorithm Merge Format: Binary Concatenation Correlation Attribute Name: myFlowfileAttributeName Minimum number of entries: 2000 Maximum number of entries: 5000 Minimum group size: 1 MB Maximum group size: 10 MB Max bin age: 5 min Maximum number of bins: 50 Delimiter strategy: Text Header: [ Footer: ] Demarcator: , With this configuration you should end up with all the items together that have the same correlation attribute value in a given 5 minute window. Once an object enters the bucket for a given value the 5 minute timer starts. Either the minimum number of objects or size is reached and it gets out right away or the minimums are not reached and it will get kicked out based on the 5 min timer. Lastly, consider switching to using MergeRecord and having JSON readers/writers in it. It will take care of the framing you're trying to do with these demarcators. Thanks Joe On Tue, Aug 28, 2018 at 1:08 PM Tim Dean <[email protected]> wrote: > > Thanks Joe - Your explanation makes sense. > > I’m now concerned that MergeContent won’t do what I want it to do. In my use > case what I really want is to gather ALL the files that come in with a > matching attribute value. There could be just one of them, or there could be > a couple thousand of them. On average there will be dozens or low hundreds. > Flowfiles with matching attribute values will tend to come in around the same > time as each other, with some variation due to network and other issues. So > what I really want is something like: > > When I see a new value in the flow file attribute, begin a new bin > Allow that bucket to receive as many incoming flow files as it needs to > (subject to a maximum as needed to constrain memory usage) > When no new flow files with a matching attribute value have come in for a > configurable duration (e.g. 5 minutes), merge all of the bin’s contents > together and move it on to the next processor. > > > Is there a better way to do this in NiFi? > > -Tim > > On Aug 28, 2018, at 11:15 AM, Joe Witt <[email protected]> wrote: > > Tim, > > This processor is powerful and its configurations very specific. > > That is a fancy way of saying this beast is complicated. > > First, can you highlight which version of NiFi you're using? > > Lets look at your settings that would cause a group of items to get > kicked out as a merge result: > > 'minimum number of entries' - you have it at 1. This means once a > given bucket contains at least one thing it is eligable/good enough to > go. Now, on a given merge session it will put more than 1 in there > but that will based on how many it has pulled at once. But, still, > you want more than 1 it sounds like. > > 'minimum group size' - you have it at 0. By the same logic above this > is likely much smaller than you intended. > > Correlation attribute name: As Juan pointed out this should not be an > expression language statement if you're trying to give the name of an > attribute unless the name of the attribute you want would be the > result of the expression language statement. This isn't consistent > with some other cases so in hindsight we should have probably made > that work differently. > > max number of bins: > If you have ten bins currently being built up and a new one is needed > it will kick out the oldest bin as 'good enough'. Consider making > this larger than 10 but if you know there aren't more than 10 needed > then you're good. You also dont want to go wild with this value > either as it can result in more memory usage than necessary. > > Thanks > > > On Tue, Aug 28, 2018 at 12:07 PM Tim Dean <[email protected]> wrote: > > > I have a flow that sends a large number of JSON files into a MergeContent > processor. The job of that processor is to combine all the incoming flow > files with a particular flow file attribute into a single flow file, creating > a JSON array containing each of the input flow files’ JSON. > > I have configured the MergeContent as processor as follows: > > Merge Strategy: Bin-Packing Algorithm > Merge Format: Binary Concatenation > Correlation Attribute Name: ${myFlowfileAttributeName} > Minimum number of entries: 1 > Maximum number of entries: 5000 > Minimum group size: 0 B > Maximum group size: <no value set> > Max bin age: 30 min > Maximum number of bins: 10 > Delimiter strategy: Text > Header: [ > Footer: ] > Demarcator: , > > > When I run data through this flow, I am seeing a large number of small-ish > merged flow files being sent to the merged relationship, I was expecting ALL > of the files for a given flow file attribute value to be binned together, but > they are not coming through that way. To give a example, I pushed through > data containing 262 input JSON files. Of these 262, 2 of them have a flow > file attribute value of ‘A’, 2 of them have a flow file attribute value of > ‘B’, and 258 have a flow file attribute of ‘C’. I was expecting the merged > relationship to deliver 3 flow files, one each for value A, B, and C. But.I > am seeing 24 flow files on the merged relationship, 1 for a value of A, 1 for > a value of B, and 22 of varying sizes with the value of C. > > Can someone help me understand what other criteria MergeContent might be > using to determine when to send along its merged flow files? > > Thanks > >
