Thanks Joe - this seems to be working for me. - Tim
Sent from my iPhone > On Aug 28, 2018, at 1:58 PM, Joe Witt <[email protected]> wrote: > > Tim, > > I dont recall what edge/bounds checking we do on that. But your logic > seems good. > > Rgr that on the MergeRecord comment. > > Thanks >> On Tue, Aug 28, 2018 at 2:51 PM Tim Dean <[email protected]> wrote: >> >> Thanks Joe - I think I can make that work. >> >> Is there any reason why I should avoid setting the minimum and maximum >> values (for both number of entires and group size) to the same values? That >> way I think it would always be the timer that triggers the new flow file, >> unless I get more input files (or a larger total amount of content size) >> than I am configured to allow. >> >> FYI - I would convert to using MergeRecord but I don’t have a schema I can >> refer to this for data. It seems like MergeContent is the only way to handle >> more of a “free form” JSON structure like my input data has. >> >> -Tim >> >>> On Aug 28, 2018, at 12:37 PM, Joe Witt <[email protected]> wrote: >>> >>> Tim >>> >>> Yeah so I think you want to set it like the following roughly >>> >>> Merge Strategy: Bin-Packing Algorithm >>> Merge Format: Binary Concatenation >>> Correlation Attribute Name: myFlowfileAttributeName >>> Minimum number of entries: 2000 >>> Maximum number of entries: 5000 >>> Minimum group size: 1 MB >>> Maximum group size: 10 MB >>> Max bin age: 5 min >>> Maximum number of bins: 50 >>> Delimiter strategy: Text >>> Header: [ >>> Footer: ] >>> Demarcator: , >>> >>> With this configuration you should end up with all the items together >>> that have the same correlation attribute value in a given 5 minute >>> window. Once an object enters the bucket for a given value the 5 >>> minute timer starts. Either the minimum number of objects or size is >>> reached and it gets out right away or the minimums are not reached and >>> it will get kicked out based on the 5 min timer. >>> >>> Lastly, consider switching to using MergeRecord and having JSON >>> readers/writers in it. It will take care of the framing you're trying >>> to do with these demarcators. >>> >>> Thanks >>> Joe >>>> On Tue, Aug 28, 2018 at 1:08 PM Tim Dean <[email protected]> wrote: >>>> >>>> Thanks Joe - Your explanation makes sense. >>>> >>>> I’m now concerned that MergeContent won’t do what I want it to do. In my >>>> use case what I really want is to gather ALL the files that come in with a >>>> matching attribute value. There could be just one of them, or there could >>>> be a couple thousand of them. On average there will be dozens or low >>>> hundreds. Flowfiles with matching attribute values will tend to come in >>>> around the same time as each other, with some variation due to network and >>>> other issues. So what I really want is something like: >>>> >>>> When I see a new value in the flow file attribute, begin a new bin >>>> Allow that bucket to receive as many incoming flow files as it needs to >>>> (subject to a maximum as needed to constrain memory usage) >>>> When no new flow files with a matching attribute value have come in for a >>>> configurable duration (e.g. 5 minutes), merge all of the bin’s contents >>>> together and move it on to the next processor. >>>> >>>> >>>> Is there a better way to do this in NiFi? >>>> >>>> -Tim >>>> >>>> On Aug 28, 2018, at 11:15 AM, Joe Witt <[email protected]> wrote: >>>> >>>> Tim, >>>> >>>> This processor is powerful and its configurations very specific. >>>> >>>> That is a fancy way of saying this beast is complicated. >>>> >>>> First, can you highlight which version of NiFi you're using? >>>> >>>> Lets look at your settings that would cause a group of items to get >>>> kicked out as a merge result: >>>> >>>> 'minimum number of entries' - you have it at 1. This means once a >>>> given bucket contains at least one thing it is eligable/good enough to >>>> go. Now, on a given merge session it will put more than 1 in there >>>> but that will based on how many it has pulled at once. But, still, >>>> you want more than 1 it sounds like. >>>> >>>> 'minimum group size' - you have it at 0. By the same logic above this >>>> is likely much smaller than you intended. >>>> >>>> Correlation attribute name: As Juan pointed out this should not be an >>>> expression language statement if you're trying to give the name of an >>>> attribute unless the name of the attribute you want would be the >>>> result of the expression language statement. This isn't consistent >>>> with some other cases so in hindsight we should have probably made >>>> that work differently. >>>> >>>> max number of bins: >>>> If you have ten bins currently being built up and a new one is needed >>>> it will kick out the oldest bin as 'good enough'. Consider making >>>> this larger than 10 but if you know there aren't more than 10 needed >>>> then you're good. You also dont want to go wild with this value >>>> either as it can result in more memory usage than necessary. >>>> >>>> Thanks >>>> >>>> >>>> On Tue, Aug 28, 2018 at 12:07 PM Tim Dean <[email protected]> wrote: >>>> >>>> >>>> I have a flow that sends a large number of JSON files into a MergeContent >>>> processor. The job of that processor is to combine all the incoming flow >>>> files with a particular flow file attribute into a single flow file, >>>> creating a JSON array containing each of the input flow files’ JSON. >>>> >>>> I have configured the MergeContent as processor as follows: >>>> >>>> Merge Strategy: Bin-Packing Algorithm >>>> Merge Format: Binary Concatenation >>>> Correlation Attribute Name: ${myFlowfileAttributeName} >>>> Minimum number of entries: 1 >>>> Maximum number of entries: 5000 >>>> Minimum group size: 0 B >>>> Maximum group size: <no value set> >>>> Max bin age: 30 min >>>> Maximum number of bins: 10 >>>> Delimiter strategy: Text >>>> Header: [ >>>> Footer: ] >>>> Demarcator: , >>>> >>>> >>>> When I run data through this flow, I am seeing a large number of small-ish >>>> merged flow files being sent to the merged relationship, I was expecting >>>> ALL of the files for a given flow file attribute value to be binned >>>> together, but they are not coming through that way. To give a example, I >>>> pushed through data containing 262 input JSON files. Of these 262, 2 of >>>> them have a flow file attribute value of ‘A’, 2 of them have a flow file >>>> attribute value of ‘B’, and 258 have a flow file attribute of ‘C’. I was >>>> expecting the merged relationship to deliver 3 flow files, one each for >>>> value A, B, and C. But.I am seeing 24 flow files on the merged >>>> relationship, 1 for a value of A, 1 for a value of B, and 22 of varying >>>> sizes with the value of C. >>>> >>>> Can someone help me understand what other criteria MergeContent might be >>>> using to determine when to send along its merged flow files? >>>> >>>> Thanks >>>> >>>> >>
