I like what Mark suggests but if you're not sure how many fragments there will be during the flow for a given group then I think you could keep doing what you were but set it to some really high min count and min size and then have a time based kick out that waits long enough where you're confident all bits would have come in.
This is a great use case and one that requires a lot of though on how best to tackle it! On Wed, Aug 10, 2016 at 11:00 AM, Mark Payne <[email protected]> wrote: > Michael, > > In the MergeContent processor, you can set the "Merge Strategy" to > "Defragment." This will tell Merge Content to > determine its bin thresholds based on the following FlowFile attributes: > > fragment.identifier > fragment.index > fragment.count > > So you'd need to set those 3 attributes on each of the FlowFiles. Rather > than using the Correlation Attribute Name, > you'd set the "fragment.identifier" attribute (you can use UpdateAttribute > to copy the value from the groupId attribute > to the 'fragment.identifier' attribute if you need to). > > The "fragment.index" attribute tells MergeContent how to order the different > FlowFiles in the merged bin. > > The "fragment.count" attribute tells MergeContent how many FlowFiles go this > bin. > > Does that all make sense? > > Thanks > -Mark > > > On Aug 10, 2016, at 10:54 AM, Michael Xu <[email protected]> wrote: > > I am sending into the MergeContent processor, payloads that each belong in a > certain group of files in some data I'm working with. Each payload has an > attribute called "groupId" which is an identification number for a > particular group of files. This is the attribute I'm using to bin each > incoming flowfile, and have set the Correlation Attribute Name to groupId. > > > > The problem I'm dealing with right now is that each groupId has a varying > number of files associated with it. As such, I'm not sure how in NiFi to > detect when the MergeContent processor has received all files for a > particular groupId, and once done, release the bin. > > > > Any help with this problem is appreciated, thanks! > >
