JFYI, updated the template on Gist. https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
I personally prefer the 2nd work-around, "setting 'Releasable FlowFile Count' to 0" because the resulted FlowFiles will have more informative attributes as evidence. On Wed, Jun 6, 2018 at 4:50 PM, Koji Kawamura <[email protected]> wrote: > Hi Martijn, > > Thanks for sharing new information. > Here are couple of things to help debugging. > > # Debug Notify branch > 1. Stop Wait branch, to debug solely Notify branch function. Wait > processor deletes cache entry when it thinks no need to keep it any > longer. > 2. Make sure Notify has 'success' and 'failure' relationship. Connect > both relationships to LogAttribute or something stopped, to keep > FlowFiles in the queue between that and the Notify. > 3. Confirm every arrived FlowFile is passed to the 'success' > relationship. This confirms Notify actually sent every notification > with expected notification identifier. > 4. Check all expected keys exist in Redis > > # Debug Notify branch 2 > If you can't stop Wait branch for some reason, add > FetchDistributedMapCache right after Notify. This ensures that a > signal is successfully written in Redis. > > If we can confirm Notify branch works without issue, then I'd suspect > the once written key gets deleted somehow by Wait processors. > > There is a race condition between Wait and Notify. > My hypothesis is: > Assuming 'type1.ext1' is routed to Wait branch, and other 7 types are > to Notify branch. > 1. Notify receives 'type1.ext2', signal becomes {type1.ext2=1} > 2. Wait for 'type1.ext2' gets a signal, which has counts as {type1.ext2=1} > 3. Simultaneously, Notify for 'type2.ext1' notifies, the signal is > updated to {type1.ext2=1, type2.ext1=1} > 4. Wait for 'type1.ext2' processes the signal, since the count ab > reached to target 1, it decrement count ab to 0. Then it deletes the > key, because it thinks the signal is done because it doesn't have any > count in it. > 5. Wait for 'type2.ext1' fetch the key, but the entry is already > deleted. And it gets stuck in the 'wait' relationship. > > If that's the case, changing 'Signal Identifier' from groupId to > groupId.type can avoid the conflict. > Alternatively, setting 'Releasable FlowFile Count' to 0 can stop Wait > to delete cache key. > > Thanks, > Koji > > > > On Tue, Jun 5, 2018 at 9:23 PM, Martijn Dekkers <[email protected]> > wrote: >> Hi Koji, >> >> Some more information from debugging. >> >> I have today deployed Redis since that gives me an easy interface to check >> the existence of keys, and found that for files that end up stuck in the >> wait queues, the provenance in the Notify queue shows the relevant flowfile >> as having arrived, but the relevant key in Redis shows as (nil) >> >> Files that have been processed successfully show a "good" key in Redis. >> >> Thanks, >> >> Martijn >> >> On 5 June 2018 at 06:27, Martijn Dekkers <[email protected]> wrote: >>> >>> Hello Koji, >>> >>> Many thanks, apologies for the delay in responding - I had to work on some >>> different tasks. >>> >>> I have followed your advice and have configured a flow accordingly, and on >>> the whole the logic works. However, I still see the issue where a set will >>> be stuck in the wait queue. I have tracked it down to the instance where >>> there is a longer delay between the arrival of ext1 and ext2 files. If I >>> pause the appropriate processor that gates the ext2 files, that set will get >>> stuck. If all files come through roughly at a similar time, we see no >>> issues, and the flow happily runs. >>> >>> I am not quite sure about the best way to debug this. I have looked at the >>> attributes in provenance, and notice that the relevant counter for the >>> specific wait processor isn't updated. I am not sure how I can check the >>> status of the distributed map cache to see if this might be responsible. >>> >>> I can share my flowfile, but would have to email it to you directly, >>> unfortunately I cannot share the flowfile publicly, and sanitising it to the >>> extent that I can publicly share it would be difficult. >>> >>> Oh, we are using 1.6 >>> >>> Many thanks, >>> >>> Martijn >>> >>> On 31 May 2018 at 09:57, Koji Kawamura <[email protected]> wrote: >>>> >>>> BTW, which version are you using? I hope it is 1.4.0 or higher. There >>>> was an issue having effects to your usage. >>>> https://issues.apache.org/jira/browse/NIFI-4028 >>>> >>>> On Thu, May 31, 2018 at 4:51 PM, Koji Kawamura <[email protected]> >>>> wrote: >>>> > HI Martijn, >>>> > >>>> > I used the filename increment pattern based on your first filename >>>> > example. >>>> > file_123_456_ab.ex1 >>>> > I increment the 456 part. If it changed, that's fine. >>>> > >>>> > Your current configurations look like below: >>>> > - Given a filename: file_123_type3.ext1 >>>> > - matched with a RegEx: ^file_(\d{3}_)(\w+)\.ext1$ >>>> > - groupID will be: 123_ (including the underscore) >>>> > - counterName will be: type3 >>>> > >>>> > I was suggesting include the extension to the counterName. >>>> > How about changing the RegEx as: >>>> > - RegEx: ^file_(\d+)_(\w+\.ext\d)$ >>>> > - groupID will be: 123 >>>> > - counterName will be: type3.ext1 >>>> > >>>> > Then you can route type1.ext1 to Wait branch and other 7 to Notify. >>>> > In Wait branch, you need 7 Wait processors. >>>> > >>>> > It would fast to debug if you can share your flow template.. >>>> > >>>> > Thanks, >>>> > Koji >>>> > >>>> > On Thu, May 31, 2018 at 3:15 PM, Martijn Dekkers >>>> > <[email protected]> wrote: >>>> >> Thank you Koji, >>>> >> >>>> >> I have tried once again, using your updated example. Unfortunately, >>>> >> things >>>> >> still get stuck at the first Wait processors' wait queue. >>>> >> I did notice that the format of the files your example generates is >>>> >> different. I will try to clarify: >>>> >> >>>> >> - 8 files in total: >>>> >> >>>> >> -- file_123_type1.ext1 >>>> >> -- file_123_type1.ext2 >>>> >> >>>> >> -- file_123_type2.ext1 >>>> >> -- file_123_type2.ext2 >>>> >> >>>> >> -- file_123_type3.ext1 >>>> >> -- file_123_type3.ext2 >>>> >> >>>> >> -- file_123_type4.ext1 >>>> >> -- file_123_type4.ext2 >>>> >> >>>> >> For each set of 8 files, "file_123" increments, so the first set of 8 >>>> >> is >>>> >> "file_123", and the next set is "file_124" and so on. >>>> >> >>>> >> When I look at your example, I notice that at the final step >>>> >> (LogAttribute >>>> >> after the FetchFile set) the filenames are file_123_<incrementing >>>> >> number>.ex(1|2) >>>> >> >>>> >> My UpdateAttribute before the Notify branch is configured as: >>>> >> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')} >>>> >> counterName - >>>> >> ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$2')} >>>> >> >>>> >> The UpdateAttribute before the Wait branch is configured as: >>>> >> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')} >>>> >> >>>> >> The 4 Wait processors in the Wait branch are configured as: >>>> >> All Wait processors: >>>> >> Release Signal Identifier - ${groupID} >>>> >> >>>> >> For each individual Wait processor: >>>> >> Signal Counter Name - type1 >>>> >> Signal Counter Name - type2 >>>> >> Signal Counter Name - type3 >>>> >> Signal Counter Name - type4 >>>> >> >>>> >> I am a bit stumped. The best success we had was a configuration with a >>>> >> RouteonAttribute sending each of type1|type2|type3|type4 to their own >>>> >> Wait >>>> >> processor, and a similar config for the Notify branch, followed by a >>>> >> final >>>> >> Wait/Notify pair that simply ensures we have the correct amount of >>>> >> sets. >>>> >> >>>> >> This configuration did exactly what we want, but unfortunately we had >>>> >> random >>>> >> flowfiles stuck in the waitqueue for no apparent reason. >>>> >> >>>> >> Thanks, >>>> >> >>>> >> Martijn >>>> >> >>>> >> >>>> >> >>>> >> On 31 May 2018 at 05:23, Koji Kawamura <[email protected]> wrote: >>>> >>> >>>> >>> The order of arrival does not matter. >>>> >>> >>>> >>> Wait processor has 'Expiration Duration' configuration, defaults to >>>> >>> 10 >>>> >>> min. Please adjust it according to your needs, the longest period to >>>> >>> wait for a delayed file. >>>> >>> If a FlowFile exceeds the duration, it will be sent to 'expired' >>>> >>> relationship, and can be treated differently, e.g. write ERROR log >>>> >>> >>>> >>> > If we have a longer wait for a file, we'd like processing for the >>>> >>> > next >>>> >>> > groupid to still be able to continue. >>>> >>> >>>> >>> In order to achieve that, use Wait Mode = 'Transfer to wait >>>> >>> relationship', and the 'wait' relationship should be configured to >>>> >>> use >>>> >>> FirstInFirstOutPrioritizer. >>>> >>> If FirstInFirstOutPrioritizer is not set, the same FlowFile will be >>>> >>> processed again while it blocks other FlowFiles. >>>> >>> With FirstInFirstOutPrioritizer, the processed FlowFile will be >>>> >>> re-queued at the end of wait queue. >>>> >>> >>>> >>> I've updated my example to make it more realistic, by adding delay >>>> >>> for >>>> >>> certain set and type. >>>> >>> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd >>>> >>> >>>> >>> Thanks, >>>> >>> Koji >>>> >>> >>>> >>> On Thu, May 31, 2018 at 10:56 AM, Martijn Dekkers >>>> >>> <[email protected]> wrote: >>>> >>> > Cool, that will make things a lot simpler. Does it matter that the >>>> >>> > ext2 >>>> >>> > files arrive in random order? Sometimes there can be a very long >>>> >>> > delay >>>> >>> > in >>>> >>> > some of them showing up, and we have some concerns about the >>>> >>> > overall >>>> >>> > flow >>>> >>> > blocking. If we have a longer wait for a file, we'd like processing >>>> >>> > for >>>> >>> > the >>>> >>> > next groupid to still be able to continue. >>>> >>> > >>>> >>> > Thank you for your help (and for writing Wait/Notify!!) >>>> >>> > >>>> >>> > Martijn >>>> >>> > >>>> >>> > On 31 May 2018 at 03:49, Koji Kawamura <[email protected]> >>>> >>> > wrote: >>>> >>> >> >>>> >>> >> Glad to hear that was helpful. >>>> >>> >> >>>> >>> >> "4 same type for each extension", can be treated as "8 distinct >>>> >>> >> types" >>>> >>> >> if an extension is included in a type. >>>> >>> >> ab.ex1, cd.ex1, ef.ex1, gh.ex1, ab.ex2, cd.ex2, ef.ex2, gh.ex2 >>>> >>> >> >>>> >>> >> Then route only 'ab.ex1' (or whichever but just 1 of them) to the >>>> >>> >> Wait >>>> >>> >> branch, and the rest to Notify branch. >>>> >>> >> That will simplify the flow, if I'm not missing any other >>>> >>> >> requirement. >>>> >>> >> >>>> >>> >> Thanks! >>>> >>> >> Koji >>>> >>> >> >>>> >>> >> On Thu, May 31, 2018 at 10:30 AM, Martijn Dekkers >>>> >>> >> <[email protected]> wrote: >>>> >>> >> > Hi Koji, Many thanks for your continued assistance! >>>> >>> >> > >>>> >>> >> >> >>>> >>> >> >> - 1 file per second is relatively low in terms of traffic, it >>>> >>> >> >> should >>>> >>> >> >> be processed fine with 1 thread >>>> >>> >> >> - A flow like this, which is stateful across different parts of >>>> >>> >> >> the >>>> >>> >> >> flow works at best with single thread, because using multiple >>>> >>> >> >> threads >>>> >>> >> >> would cause race condition or concurrency issue if there's any >>>> >>> >> >> implementation error >>>> >>> >> > >>>> >>> >> > >>>> >>> >> > Yes, we had similar thoughts. >>>> >>> >> > >>>> >>> >> >> >>>> >>> >> >> - Based on above, I strongly recommend to NOT increase >>>> >>> >> >> "concurrent >>>> >>> >> >> tasks". If you see FlowFiles staying in a wait queue, then >>>> >>> >> >> there >>>> >>> >> >> must >>>> >>> >> >> be different issue >>>> >>> >> > >>>> >>> >> > >>>> >>> >> > We don't see many flowfiles stuck in a wait queue, I ran a test >>>> >>> >> > over >>>> >>> >> > a >>>> >>> >> > few >>>> >>> >> > hours yesterday that simulates the way in which these files >>>> >>> >> > would >>>> >>> >> > appear >>>> >>> >> > (we >>>> >>> >> > would have 4 of "ext1" show up every second, and the "ext2" can >>>> >>> >> > show >>>> >>> >> > up >>>> >>> >> > a >>>> >>> >> > few seconds later, and not always in the same order) and we >>>> >>> >> > found >>>> >>> >> > perhaps 6 >>>> >>> >> > flowfiles stuck in a wait queue. >>>> >>> >> > >>>> >>> >> >> >>>> >>> >> >> - Also, using concurrent tasks number like 400 is too much in >>>> >>> >> >> general >>>> >>> >> >> for all processors. I recommend to increment it as 2, 3, 4 .. >>>> >>> >> >> up to >>>> >>> >> >> 8 >>>> >>> >> >> or so, only if you see the clear benefit by doing so >>>> >>> >> > >>>> >>> >> > >>>> >>> >> > Indeed, thanks for the suggestion. Once we have the logic >>>> >>> >> > finished >>>> >>> >> > and >>>> >>> >> > tested we will have to optimise this Flow. The next step is to >>>> >>> >> > try to >>>> >>> >> > load >>>> >>> >> > the required processors into MiNiFy, as this will be running on >>>> >>> >> > many >>>> >>> >> > systems >>>> >>> >> > with limited capacity. If we don't manage with MiNiFy, we will >>>> >>> >> > still >>>> >>> >> > be >>>> >>> >> > good, but we prefer to have the smaller footprint and ease of >>>> >>> >> > management >>>> >>> >> > we >>>> >>> >> > can obtain with MiNiFy. >>>> >>> >> > >>>> >>> >> >> >>>> >>> >> >> - The important part of this flow is extracting 'groupId' and >>>> >>> >> >> 'type' >>>> >>> >> >> from file names. Regular Expression needs to be configured >>>> >>> >> >> properly. >>>> >>> >> >> - I recommend using https://regex101.com/ to test your Regular >>>> >>> >> >> Expression to see whether it can extract correct groupId and >>>> >>> >> >> type >>>> >>> >> > >>>> >>> >> > >>>> >>> >> > Yes, we have tested our RegExes for this extensively >>>> >>> >> > >>>> >>> >> >> >>>> >>> >> >> >>>> >>> >> >> Lastly, regardless of how many files should be there for 'ext1' >>>> >>> >> >> and >>>> >>> >> >> 'ext2', the flow structure is simple as below. >>>> >>> >> >> Let's say there should be 8 files to start processing those. >>>> >>> >> >> 4 x ex1, and 4 ex2 in your case, but let's think it as 8 file >>>> >>> >> >> types. >>>> >>> >> >> And I assume the types are known, meaning, static, not >>>> >>> >> >> dynamically >>>> >>> >> >> change. >>>> >>> >> > >>>> >>> >> > >>>> >>> >> > Correct, the format is <groupID><type>.<ext> where: >>>> >>> >> > >>>> >>> >> > - groupId is unique for each set of 8 >>>> >>> >> > - type has 4 variants (ab, cd, ef, gh), the same type-set for >>>> >>> >> > each >>>> >>> >> > ext >>>> >>> >> > >>>> >>> >> >> So, the rule is, "a set of files consists of 8 files, and a set >>>> >>> >> >> should >>>> >>> >> >> wait to be processed until all 8 files are ready", that's all. >>>> >>> >> > >>>> >>> >> > >>>> >>> >> > For our use case it is important that we have positive >>>> >>> >> > identification >>>> >>> >> > that >>>> >>> >> > we have exact "positive identification" of each file. >>>> >>> >> > >>>> >>> >> >> >>>> >>> >> >> Then, the flow should be designed like below: >>>> >>> >> >> 1. List files, each file will be sent as a FlowFile >>>> >>> >> > >>>> >>> >> > >>>> >>> >> > Correct - we have several different listfiles for other sections >>>> >>> >> > of >>>> >>> >> > the >>>> >>> >> > flow, we are actually collecting many different sets, all >>>> >>> >> > variants of >>>> >>> >> > the >>>> >>> >> > above. However, those are far simpler (sets of 2 - ext1 and ext2 >>>> >>> >> > only) >>>> >>> >> > >>>> >>> >> >> >>>> >>> >> >> 2. Extract groupId and type from filename >>>> >>> >> > >>>> >>> >> > >>>> >>> >> > Correct >>>> >>> >> > >>>> >>> >> >> >>>> >>> >> >> 3. Route FlowFiles into two branches, let's call these 'Notify' >>>> >>> >> >> branch >>>> >>> >> >> and 'Wait' branch, and pass only 1 type for a set to >>>> >>> >> >> Wait-branch, >>>> >>> >> >> and >>>> >>> >> >> the rest 7 types to Notify-branch >>>> >>> >> > >>>> >>> >> > >>>> >>> >> > OK, we currently split Notify branch to "all ext1" and wait >>>> >>> >> > branch to >>>> >>> >> > "all >>>> >>> >> > ext2" >>>> >>> >> > >>>> >>> >> >> >>>> >>> >> >> At Notify branch (for the rest 7 types FlowFile, e.g. type 2, >>>> >>> >> >> 3, 4 >>>> >>> >> >> ... >>>> >>> >> >> 8) >>>> >>> >> > >>>> >>> >> > >>>> >>> >> > As mentioned, we only have 4 distinct types. >>>> >>> >> > >>>> >>> >> >> >>>> >>> >> >> 1. Notify that the type for a group has arrived. >>>> >>> >> >> 2. Discard the FlowFile, because there's nothing to do with it >>>> >>> >> >> in >>>> >>> >> >> this >>>> >>> >> >> branch >>>> >>> >> > >>>> >>> >> > >>>> >>> >> > >>>> >>> >> >> >>>> >>> >> >> At Wait branch (for the type 1 FlowFile) >>>> >>> >> >> 1. Wait for type 2 for the groupId. >>>> >>> >> >> 2. Wait for type 3 for the groupId, type 4, 5 and so on >>>> >>> >> >> 3. After passing Wait for type 8, it can guarantee that all 8 >>>> >>> >> >> files >>>> >>> >> >> are available (unless there is any other program deleting >>>> >>> >> >> those) >>>> >>> >> >> 4. Get actual file content using FetchFile, and process it >>>> >>> >> > >>>> >>> >> > >>>> >>> >> > Besides the "4 same types for each extension", this is >>>> >>> >> > configured as >>>> >>> >> > you >>>> >>> >> > describe. >>>> >>> >> > >>>> >>> >> >> >>>> >>> >> >> I hope it helps. >>>> >>> >> >> >>>> >>> >> > >>>> >>> >> > It does, thanks. I will extract this portion of the flow, >>>> >>> >> > sanitise, >>>> >>> >> > and >>>> >>> >> > send >>>> >>> >> > it along - easier to see than to describe :) >>>> >>> >> > >>>> >>> >> > >>>> >>> >> >> >>>> >>> >> >> Thanks, >>>> >>> >> >> Koji >>>> >>> >> > >>>> >>> >> > >>>> >>> >> > Thank you so much once again! >>>> >>> >> > >>>> >>> >> > Martijn >>>> >>> >> > >>>> >>> >> > >>>> >>> >> > >>>> >>> >> >> >>>> >>> >> >> >>>> >>> >> >> On Wed, May 30, 2018 at 6:10 PM, Martijn Dekkers >>>> >>> >> >> <[email protected]> >>>> >>> >> >> wrote: >>>> >>> >> >> > Hey Pierre, >>>> >>> >> >> > >>>> >>> >> >> > Yes, we suspected as much, but we are only seeing this with >>>> >>> >> >> > the >>>> >>> >> >> > Wait >>>> >>> >> >> > processor. Possibly because that is the only "blocking" we >>>> >>> >> >> > have in >>>> >>> >> >> > this >>>> >>> >> >> > flow. >>>> >>> >> >> > >>>> >>> >> >> > Thanks for the clarification, much appreciated! >>>> >>> >> >> > >>>> >>> >> >> > Martijn >>>> >>> >> >> > >>>> >>> >> >> > On 30 May 2018 at 10:30, Pierre Villard >>>> >>> >> >> > <[email protected]> >>>> >>> >> >> > wrote: >>>> >>> >> >> >> >>>> >>> >> >> >> I'll let Koji give more information about the Wait/Notify, >>>> >>> >> >> >> he is >>>> >>> >> >> >> clearly >>>> >>> >> >> >> the expert here. >>>> >>> >> >> >> >>>> >>> >> >> >> I'm just jumping in regarding your "and when viewing the >>>> >>> >> >> >> queue, >>>> >>> >> >> >> the >>>> >>> >> >> >> dialog >>>> >>> >> >> >> states that the queue is empty.". You're seeing this >>>> >>> >> >> >> behavior >>>> >>> >> >> >> because, >>>> >>> >> >> >> even >>>> >>> >> >> >> though the UI shows some flow files in the queue, the flow >>>> >>> >> >> >> files >>>> >>> >> >> >> are >>>> >>> >> >> >> currently locked in the session of the running processor and >>>> >>> >> >> >> you >>>> >>> >> >> >> won't >>>> >>> >> >> >> see >>>> >>> >> >> >> flow files currently processed in a session when listing a >>>> >>> >> >> >> queue. >>>> >>> >> >> >> If >>>> >>> >> >> >> you >>>> >>> >> >> >> stop the processor, the session will be closed and you'll be >>>> >>> >> >> >> able >>>> >>> >> >> >> to >>>> >>> >> >> >> list >>>> >>> >> >> >> the queue and see the flow files. >>>> >>> >> >> >> >>>> >>> >> >> >> I recall discussions in the past to improve the UX for this. >>>> >>> >> >> >> Not >>>> >>> >> >> >> sure >>>> >>> >> >> >> we >>>> >>> >> >> >> have a JIRA for it though... >>>> >>> >> >> >> >>>> >>> >> >> >> Pierre >>>> >>> >> >> >> >>>> >>> >> >> >> 2018-05-30 8:26 GMT+02:00 Martijn Dekkers >>>> >>> >> >> >> <[email protected]>: >>>> >>> >> >> >>> >>>> >>> >> >> >>> Hi Koji, >>>> >>> >> >> >>> >>>> >>> >> >> >>> Thank you for responding. I had adjusted the run schedule >>>> >>> >> >> >>> to >>>> >>> >> >> >>> closely >>>> >>> >> >> >>> mimic our environment. We are expecting about 1 file per >>>> >>> >> >> >>> second >>>> >>> >> >> >>> or >>>> >>> >> >> >>> so. >>>> >>> >> >> >>> We are also seeing some random "orphans" sitting in a wait >>>> >>> >> >> >>> queue >>>> >>> >> >> >>> every >>>> >>> >> >> >>> now and again that don't trigger a debug message, and when >>>> >>> >> >> >>> viewing >>>> >>> >> >> >>> the >>>> >>> >> >> >>> queue, the dialog states that the queue is empty. >>>> >>> >> >> >>> >>>> >>> >> >> >>> We found the random "no signal found" issue to be >>>> >>> >> >> >>> significantly >>>> >>> >> >> >>> decreased >>>> >>> >> >> >>> when we increase the "concurrent tasks" to something large >>>> >>> >> >> >>> - >>>> >>> >> >> >>> currently >>>> >>> >> >> >>> set >>>> >>> >> >> >>> to 400 for all wait and notify processors. >>>> >>> >> >> >>> >>>> >>> >> >> >>> I do need to mention that our requirements had changed >>>> >>> >> >> >>> since you >>>> >>> >> >> >>> made >>>> >>> >> >> >>> the >>>> >>> >> >> >>> template, in that we are looking for a set of 8 files - 4 x >>>> >>> >> >> >>> "ext1" >>>> >>> >> >> >>> and >>>> >>> >> >> >>> 4 x >>>> >>> >> >> >>> "ext2" both with the same pattern: <groupid><type (4 of >>>> >>> >> >> >>> these)>.ext1 >>>> >>> >> >> >>> or ext2 >>>> >>> >> >> >>> >>>> >>> >> >> >>> We found that the best way to make this work was to add >>>> >>> >> >> >>> another >>>> >>> >> >> >>> wait/notify pair, each processor coming after the ones >>>> >>> >> >> >>> already >>>> >>> >> >> >>> there, >>>> >>> >> >> >>> with a >>>> >>> >> >> >>> simple counter against the groupID. >>>> >>> >> >> >>> >>>> >>> >> >> >>> I will export a template for you, many thanks for your help >>>> >>> >> >> >>> - I >>>> >>> >> >> >>> just >>>> >>> >> >> >>> need >>>> >>> >> >> >>> to spend some time sanitising the varies fields etc. >>>> >>> >> >> >>> >>>> >>> >> >> >>> Many thanks once again for your kind assistance. >>>> >>> >> >> >>> >>>> >>> >> >> >>> Martijn >>>> >>> >> >> >>> >>>> >>> >> >> >>> On 30 May 2018 at 08:14, Koji Kawamura >>>> >>> >> >> >>> <[email protected]> >>>> >>> >> >> >>> wrote: >>>> >>> >> >> >>>> >>>> >>> >> >> >>>> Hi Martjin, >>>> >>> >> >> >>>> >>>> >>> >> >> >>>> In my template, I was using 'Run Schedule' as '5 secs' for >>>> >>> >> >> >>>> the >>>> >>> >> >> >>>> Wait >>>> >>> >> >> >>>> processors to avoid overusing CPU resource. However, if >>>> >>> >> >> >>>> you >>>> >>> >> >> >>>> expect >>>> >>> >> >> >>>> more throughput, it should be lowered. >>>> >>> >> >> >>>> Changed Run Schedule to 0 sec, and I passed 100 group of >>>> >>> >> >> >>>> files >>>> >>> >> >> >>>> (400 >>>> >>> >> >> >>>> files because 4 files are 1 set in my example), they >>>> >>> >> >> >>>> reached to >>>> >>> >> >> >>>> the >>>> >>> >> >> >>>> expected goal of the flow without issue. >>>> >>> >> >> >>>> >>>> >>> >> >> >>>> If you can share your flow and example input file volume >>>> >>> >> >> >>>> (hundreds >>>> >>> >> >> >>>> of >>>> >>> >> >> >>>> files were fine in my flow), I may be able to provide more >>>> >>> >> >> >>>> useful >>>> >>> >> >> >>>> comment. >>>> >>> >> >> >>>> >>>> >>> >> >> >>>> Thanks, >>>> >>> >> >> >>>> Koji >>>> >>> >> >> >>>> >>>> >>> >> >> >>>> On Wed, May 30, 2018 at 2:08 PM, Martijn Dekkers >>>> >>> >> >> >>>> <[email protected]> wrote: >>>> >>> >> >> >>>> > Hi Koji, >>>> >>> >> >> >>>> > >>>> >>> >> >> >>>> > I am seeing many issues to get this to run reliably. >>>> >>> >> >> >>>> > When >>>> >>> >> >> >>>> > running >>>> >>> >> >> >>>> > this >>>> >>> >> >> >>>> > with >>>> >>> >> >> >>>> > a few flowfiles at a time, and stepping through by >>>> >>> >> >> >>>> > switching >>>> >>> >> >> >>>> > processors on >>>> >>> >> >> >>>> > and off it works mostly fine, but running this at volume >>>> >>> >> >> >>>> > I >>>> >>> >> >> >>>> > receive >>>> >>> >> >> >>>> > many >>>> >>> >> >> >>>> > errors about "no release signal found" >>>> >>> >> >> >>>> > >>>> >>> >> >> >>>> > I have tried to fix this in a few different ways, but >>>> >>> >> >> >>>> > the >>>> >>> >> >> >>>> > issue >>>> >>> >> >> >>>> > keeps >>>> >>> >> >> >>>> > coming >>>> >>> >> >> >>>> > back. This is also not consistent at all - different >>>> >>> >> >> >>>> > wait >>>> >>> >> >> >>>> > processors >>>> >>> >> >> >>>> > will >>>> >>> >> >> >>>> > block different flowfiles at different times, without >>>> >>> >> >> >>>> > changing >>>> >>> >> >> >>>> > any >>>> >>> >> >> >>>> > configuration. Stop/Start the flow, and different queues >>>> >>> >> >> >>>> > will >>>> >>> >> >> >>>> > fill >>>> >>> >> >> >>>> > up. >>>> >>> >> >> >>>> > Do >>>> >>> >> >> >>>> > you have any ideas what could be causing this behavior? >>>> >>> >> >> >>>> > I >>>> >>> >> >> >>>> > checked >>>> >>> >> >> >>>> > the >>>> >>> >> >> >>>> > DistributedMapCache Server/Client components, and they >>>> >>> >> >> >>>> > all >>>> >>> >> >> >>>> > appear >>>> >>> >> >> >>>> > to >>>> >>> >> >> >>>> > be >>>> >>> >> >> >>>> > working OK. >>>> >>> >> >> >>>> > >>>> >>> >> >> >>>> > Thanks, >>>> >>> >> >> >>>> > >>>> >>> >> >> >>>> > Martijn >>>> >>> >> >> >>>> > >>>> >>> >> >> >>>> > On 28 May 2018 at 05:11, Koji Kawamura >>>> >>> >> >> >>>> > <[email protected]> >>>> >>> >> >> >>>> > wrote: >>>> >>> >> >> >>>> >> >>>> >>> >> >> >>>> >> Hi Martin, >>>> >>> >> >> >>>> >> >>>> >>> >> >> >>>> >> Alternative approach is using Wait/Notify processors. >>>> >>> >> >> >>>> >> I have developed similar flow using those before, and >>>> >>> >> >> >>>> >> it >>>> >>> >> >> >>>> >> will >>>> >>> >> >> >>>> >> work >>>> >>> >> >> >>>> >> with your case I believe. >>>> >>> >> >> >>>> >> A NiFi flow template is available here. >>>> >>> >> >> >>>> >> >>>> >>> >> >> >>>> >> >>>> >>> >> >> >>>> >> >>>> >>> >> >> >>>> >> >>>> >>> >> >> >>>> >> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd >>>> >>> >> >> >>>> >> >>>> >>> >> >> >>>> >> Hope this helps, >>>> >>> >> >> >>>> >> Koji >>>> >>> >> >> >>>> >> >>>> >>> >> >> >>>> >> >>>> >>> >> >> >>>> >> On Sun, May 27, 2018 at 11:48 PM, Andrew Grande >>>> >>> >> >> >>>> >> <[email protected]> >>>> >>> >> >> >>>> >> wrote: >>>> >>> >> >> >>>> >> > Martijn, >>>> >>> >> >> >>>> >> > >>>> >>> >> >> >>>> >> > Here's an idea you could explore. Have the ListFile >>>> >>> >> >> >>>> >> > processor >>>> >>> >> >> >>>> >> > work >>>> >>> >> >> >>>> >> > as >>>> >>> >> >> >>>> >> > usual >>>> >>> >> >> >>>> >> > and create a custom component (start with a scripting >>>> >>> >> >> >>>> >> > one >>>> >>> >> >> >>>> >> > to >>>> >>> >> >> >>>> >> > prototype) >>>> >>> >> >> >>>> >> > grouping the filenames as needed. I don't know of the >>>> >>> >> >> >>>> >> > number >>>> >>> >> >> >>>> >> > of >>>> >>> >> >> >>>> >> > files in >>>> >>> >> >> >>>> >> > a >>>> >>> >> >> >>>> >> > set is different every time, so trying to be more >>>> >>> >> >> >>>> >> > robust. >>>> >>> >> >> >>>> >> > >>>> >>> >> >> >>>> >> > Once you group and count the set, you can transfer >>>> >>> >> >> >>>> >> > the >>>> >>> >> >> >>>> >> > names >>>> >>> >> >> >>>> >> > to >>>> >>> >> >> >>>> >> > the >>>> >>> >> >> >>>> >> > success >>>> >>> >> >> >>>> >> > relationship. Ignore otherwise and wait until the set >>>> >>> >> >> >>>> >> > is >>>> >>> >> >> >>>> >> > full. >>>> >>> >> >> >>>> >> > >>>> >>> >> >> >>>> >> > Andrew >>>> >>> >> >> >>>> >> > >>>> >>> >> >> >>>> >> > >>>> >>> >> >> >>>> >> > On Sun, May 27, 2018, 7:29 AM Martijn Dekkers >>>> >>> >> >> >>>> >> > <[email protected]> >>>> >>> >> >> >>>> >> > wrote: >>>> >>> >> >> >>>> >> >> >>>> >>> >> >> >>>> >> >> Hello all, >>>> >>> >> >> >>>> >> >> >>>> >>> >> >> >>>> >> >> I am trying to work out an issue with little >>>> >>> >> >> >>>> >> >> success. >>>> >>> >> >> >>>> >> >> >>>> >>> >> >> >>>> >> >> I need to ingest files generated by some >>>> >>> >> >> >>>> >> >> application. I >>>> >>> >> >> >>>> >> >> can >>>> >>> >> >> >>>> >> >> only >>>> >>> >> >> >>>> >> >> ingest >>>> >>> >> >> >>>> >> >> these files when a specific set exists. For example: >>>> >>> >> >> >>>> >> >> >>>> >>> >> >> >>>> >> >> file_123_456_ab.ex1 >>>> >>> >> >> >>>> >> >> file_123_456_cd.ex1 >>>> >>> >> >> >>>> >> >> file_123_456_ef.ex1 >>>> >>> >> >> >>>> >> >> file_123_456_gh.ex1 >>>> >>> >> >> >>>> >> >> file_123_456.ex2 >>>> >>> >> >> >>>> >> >> >>>> >>> >> >> >>>> >> >> Only when a set like that exists should I pick them >>>> >>> >> >> >>>> >> >> up >>>> >>> >> >> >>>> >> >> into >>>> >>> >> >> >>>> >> >> the >>>> >>> >> >> >>>> >> >> Flow. >>>> >>> >> >> >>>> >> >> The >>>> >>> >> >> >>>> >> >> parts I am looking for to "group" would "ab.ex1", >>>> >>> >> >> >>>> >> >> "cd.ex1", >>>> >>> >> >> >>>> >> >> "ef.ex1", >>>> >>> >> >> >>>> >> >> "gh.ex1", ".ex2". >>>> >>> >> >> >>>> >> >> >>>> >>> >> >> >>>> >> >> I tried to do this with some expression, but >>>> >>> >> >> >>>> >> >> couldn't >>>> >>> >> >> >>>> >> >> work >>>> >>> >> >> >>>> >> >> it >>>> >>> >> >> >>>> >> >> out. >>>> >>> >> >> >>>> >> >> >>>> >>> >> >> >>>> >> >> What would be the best way to achieve this? >>>> >>> >> >> >>>> >> >> >>>> >>> >> >> >>>> >> >> Many thanks! >>>> >>> >> >> >>>> > >>>> >>> >> >> >>>> > >>>> >>> >> >> >>> >>>> >>> >> >> >>> >>>> >>> >> >> >> >>>> >>> >> >> > >>>> >>> >> > >>>> >>> >> > >>>> >>> > >>>> >>> > >>>> >> >>>> >> >>> >>> >>
