BTW, which version are you using? I hope it is 1.4.0 or higher. There was an issue having effects to your usage. https://issues.apache.org/jira/browse/NIFI-4028
On Thu, May 31, 2018 at 4:51 PM, Koji Kawamura <[email protected]> wrote: > HI Martijn, > > I used the filename increment pattern based on your first filename example. > file_123_456_ab.ex1 > I increment the 456 part. If it changed, that's fine. > > Your current configurations look like below: > - Given a filename: file_123_type3.ext1 > - matched with a RegEx: ^file_(\d{3}_)(\w+)\.ext1$ > - groupID will be: 123_ (including the underscore) > - counterName will be: type3 > > I was suggesting include the extension to the counterName. > How about changing the RegEx as: > - RegEx: ^file_(\d+)_(\w+\.ext\d)$ > - groupID will be: 123 > - counterName will be: type3.ext1 > > Then you can route type1.ext1 to Wait branch and other 7 to Notify. > In Wait branch, you need 7 Wait processors. > > It would fast to debug if you can share your flow template.. > > Thanks, > Koji > > On Thu, May 31, 2018 at 3:15 PM, Martijn Dekkers <[email protected]> > wrote: >> Thank you Koji, >> >> I have tried once again, using your updated example. Unfortunately, things >> still get stuck at the first Wait processors' wait queue. >> I did notice that the format of the files your example generates is >> different. I will try to clarify: >> >> - 8 files in total: >> >> -- file_123_type1.ext1 >> -- file_123_type1.ext2 >> >> -- file_123_type2.ext1 >> -- file_123_type2.ext2 >> >> -- file_123_type3.ext1 >> -- file_123_type3.ext2 >> >> -- file_123_type4.ext1 >> -- file_123_type4.ext2 >> >> For each set of 8 files, "file_123" increments, so the first set of 8 is >> "file_123", and the next set is "file_124" and so on. >> >> When I look at your example, I notice that at the final step (LogAttribute >> after the FetchFile set) the filenames are file_123_<incrementing >> number>.ex(1|2) >> >> My UpdateAttribute before the Notify branch is configured as: >> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')} >> counterName - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$2')} >> >> The UpdateAttribute before the Wait branch is configured as: >> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')} >> >> The 4 Wait processors in the Wait branch are configured as: >> All Wait processors: >> Release Signal Identifier - ${groupID} >> >> For each individual Wait processor: >> Signal Counter Name - type1 >> Signal Counter Name - type2 >> Signal Counter Name - type3 >> Signal Counter Name - type4 >> >> I am a bit stumped. The best success we had was a configuration with a >> RouteonAttribute sending each of type1|type2|type3|type4 to their own Wait >> processor, and a similar config for the Notify branch, followed by a final >> Wait/Notify pair that simply ensures we have the correct amount of sets. >> >> This configuration did exactly what we want, but unfortunately we had random >> flowfiles stuck in the waitqueue for no apparent reason. >> >> Thanks, >> >> Martijn >> >> >> >> On 31 May 2018 at 05:23, Koji Kawamura <[email protected]> wrote: >>> >>> The order of arrival does not matter. >>> >>> Wait processor has 'Expiration Duration' configuration, defaults to 10 >>> min. Please adjust it according to your needs, the longest period to >>> wait for a delayed file. >>> If a FlowFile exceeds the duration, it will be sent to 'expired' >>> relationship, and can be treated differently, e.g. write ERROR log >>> >>> > If we have a longer wait for a file, we'd like processing for the next >>> > groupid to still be able to continue. >>> >>> In order to achieve that, use Wait Mode = 'Transfer to wait >>> relationship', and the 'wait' relationship should be configured to use >>> FirstInFirstOutPrioritizer. >>> If FirstInFirstOutPrioritizer is not set, the same FlowFile will be >>> processed again while it blocks other FlowFiles. >>> With FirstInFirstOutPrioritizer, the processed FlowFile will be >>> re-queued at the end of wait queue. >>> >>> I've updated my example to make it more realistic, by adding delay for >>> certain set and type. >>> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd >>> >>> Thanks, >>> Koji >>> >>> On Thu, May 31, 2018 at 10:56 AM, Martijn Dekkers >>> <[email protected]> wrote: >>> > Cool, that will make things a lot simpler. Does it matter that the ext2 >>> > files arrive in random order? Sometimes there can be a very long delay >>> > in >>> > some of them showing up, and we have some concerns about the overall >>> > flow >>> > blocking. If we have a longer wait for a file, we'd like processing for >>> > the >>> > next groupid to still be able to continue. >>> > >>> > Thank you for your help (and for writing Wait/Notify!!) >>> > >>> > Martijn >>> > >>> > On 31 May 2018 at 03:49, Koji Kawamura <[email protected]> wrote: >>> >> >>> >> Glad to hear that was helpful. >>> >> >>> >> "4 same type for each extension", can be treated as "8 distinct types" >>> >> if an extension is included in a type. >>> >> ab.ex1, cd.ex1, ef.ex1, gh.ex1, ab.ex2, cd.ex2, ef.ex2, gh.ex2 >>> >> >>> >> Then route only 'ab.ex1' (or whichever but just 1 of them) to the Wait >>> >> branch, and the rest to Notify branch. >>> >> That will simplify the flow, if I'm not missing any other requirement. >>> >> >>> >> Thanks! >>> >> Koji >>> >> >>> >> On Thu, May 31, 2018 at 10:30 AM, Martijn Dekkers >>> >> <[email protected]> wrote: >>> >> > Hi Koji, Many thanks for your continued assistance! >>> >> > >>> >> >> >>> >> >> - 1 file per second is relatively low in terms of traffic, it should >>> >> >> be processed fine with 1 thread >>> >> >> - A flow like this, which is stateful across different parts of the >>> >> >> flow works at best with single thread, because using multiple >>> >> >> threads >>> >> >> would cause race condition or concurrency issue if there's any >>> >> >> implementation error >>> >> > >>> >> > >>> >> > Yes, we had similar thoughts. >>> >> > >>> >> >> >>> >> >> - Based on above, I strongly recommend to NOT increase "concurrent >>> >> >> tasks". If you see FlowFiles staying in a wait queue, then there >>> >> >> must >>> >> >> be different issue >>> >> > >>> >> > >>> >> > We don't see many flowfiles stuck in a wait queue, I ran a test over >>> >> > a >>> >> > few >>> >> > hours yesterday that simulates the way in which these files would >>> >> > appear >>> >> > (we >>> >> > would have 4 of "ext1" show up every second, and the "ext2" can show >>> >> > up >>> >> > a >>> >> > few seconds later, and not always in the same order) and we found >>> >> > perhaps 6 >>> >> > flowfiles stuck in a wait queue. >>> >> > >>> >> >> >>> >> >> - Also, using concurrent tasks number like 400 is too much in >>> >> >> general >>> >> >> for all processors. I recommend to increment it as 2, 3, 4 .. up to >>> >> >> 8 >>> >> >> or so, only if you see the clear benefit by doing so >>> >> > >>> >> > >>> >> > Indeed, thanks for the suggestion. Once we have the logic finished >>> >> > and >>> >> > tested we will have to optimise this Flow. The next step is to try to >>> >> > load >>> >> > the required processors into MiNiFy, as this will be running on many >>> >> > systems >>> >> > with limited capacity. If we don't manage with MiNiFy, we will still >>> >> > be >>> >> > good, but we prefer to have the smaller footprint and ease of >>> >> > management >>> >> > we >>> >> > can obtain with MiNiFy. >>> >> > >>> >> >> >>> >> >> - The important part of this flow is extracting 'groupId' and 'type' >>> >> >> from file names. Regular Expression needs to be configured properly. >>> >> >> - I recommend using https://regex101.com/ to test your Regular >>> >> >> Expression to see whether it can extract correct groupId and type >>> >> > >>> >> > >>> >> > Yes, we have tested our RegExes for this extensively >>> >> > >>> >> >> >>> >> >> >>> >> >> Lastly, regardless of how many files should be there for 'ext1' and >>> >> >> 'ext2', the flow structure is simple as below. >>> >> >> Let's say there should be 8 files to start processing those. >>> >> >> 4 x ex1, and 4 ex2 in your case, but let's think it as 8 file types. >>> >> >> And I assume the types are known, meaning, static, not dynamically >>> >> >> change. >>> >> > >>> >> > >>> >> > Correct, the format is <groupID><type>.<ext> where: >>> >> > >>> >> > - groupId is unique for each set of 8 >>> >> > - type has 4 variants (ab, cd, ef, gh), the same type-set for each >>> >> > ext >>> >> > >>> >> >> So, the rule is, "a set of files consists of 8 files, and a set >>> >> >> should >>> >> >> wait to be processed until all 8 files are ready", that's all. >>> >> > >>> >> > >>> >> > For our use case it is important that we have positive identification >>> >> > that >>> >> > we have exact "positive identification" of each file. >>> >> > >>> >> >> >>> >> >> Then, the flow should be designed like below: >>> >> >> 1. List files, each file will be sent as a FlowFile >>> >> > >>> >> > >>> >> > Correct - we have several different listfiles for other sections of >>> >> > the >>> >> > flow, we are actually collecting many different sets, all variants of >>> >> > the >>> >> > above. However, those are far simpler (sets of 2 - ext1 and ext2 >>> >> > only) >>> >> > >>> >> >> >>> >> >> 2. Extract groupId and type from filename >>> >> > >>> >> > >>> >> > Correct >>> >> > >>> >> >> >>> >> >> 3. Route FlowFiles into two branches, let's call these 'Notify' >>> >> >> branch >>> >> >> and 'Wait' branch, and pass only 1 type for a set to Wait-branch, >>> >> >> and >>> >> >> the rest 7 types to Notify-branch >>> >> > >>> >> > >>> >> > OK, we currently split Notify branch to "all ext1" and wait branch to >>> >> > "all >>> >> > ext2" >>> >> > >>> >> >> >>> >> >> At Notify branch (for the rest 7 types FlowFile, e.g. type 2, 3, 4 >>> >> >> ... >>> >> >> 8) >>> >> > >>> >> > >>> >> > As mentioned, we only have 4 distinct types. >>> >> > >>> >> >> >>> >> >> 1. Notify that the type for a group has arrived. >>> >> >> 2. Discard the FlowFile, because there's nothing to do with it in >>> >> >> this >>> >> >> branch >>> >> > >>> >> > >>> >> > >>> >> >> >>> >> >> At Wait branch (for the type 1 FlowFile) >>> >> >> 1. Wait for type 2 for the groupId. >>> >> >> 2. Wait for type 3 for the groupId, type 4, 5 and so on >>> >> >> 3. After passing Wait for type 8, it can guarantee that all 8 files >>> >> >> are available (unless there is any other program deleting those) >>> >> >> 4. Get actual file content using FetchFile, and process it >>> >> > >>> >> > >>> >> > Besides the "4 same types for each extension", this is configured as >>> >> > you >>> >> > describe. >>> >> > >>> >> >> >>> >> >> I hope it helps. >>> >> >> >>> >> > >>> >> > It does, thanks. I will extract this portion of the flow, sanitise, >>> >> > and >>> >> > send >>> >> > it along - easier to see than to describe :) >>> >> > >>> >> > >>> >> >> >>> >> >> Thanks, >>> >> >> Koji >>> >> > >>> >> > >>> >> > Thank you so much once again! >>> >> > >>> >> > Martijn >>> >> > >>> >> > >>> >> > >>> >> >> >>> >> >> >>> >> >> On Wed, May 30, 2018 at 6:10 PM, Martijn Dekkers >>> >> >> <[email protected]> >>> >> >> wrote: >>> >> >> > Hey Pierre, >>> >> >> > >>> >> >> > Yes, we suspected as much, but we are only seeing this with the >>> >> >> > Wait >>> >> >> > processor. Possibly because that is the only "blocking" we have in >>> >> >> > this >>> >> >> > flow. >>> >> >> > >>> >> >> > Thanks for the clarification, much appreciated! >>> >> >> > >>> >> >> > Martijn >>> >> >> > >>> >> >> > On 30 May 2018 at 10:30, Pierre Villard >>> >> >> > <[email protected]> >>> >> >> > wrote: >>> >> >> >> >>> >> >> >> I'll let Koji give more information about the Wait/Notify, he is >>> >> >> >> clearly >>> >> >> >> the expert here. >>> >> >> >> >>> >> >> >> I'm just jumping in regarding your "and when viewing the queue, >>> >> >> >> the >>> >> >> >> dialog >>> >> >> >> states that the queue is empty.". You're seeing this behavior >>> >> >> >> because, >>> >> >> >> even >>> >> >> >> though the UI shows some flow files in the queue, the flow files >>> >> >> >> are >>> >> >> >> currently locked in the session of the running processor and you >>> >> >> >> won't >>> >> >> >> see >>> >> >> >> flow files currently processed in a session when listing a queue. >>> >> >> >> If >>> >> >> >> you >>> >> >> >> stop the processor, the session will be closed and you'll be able >>> >> >> >> to >>> >> >> >> list >>> >> >> >> the queue and see the flow files. >>> >> >> >> >>> >> >> >> I recall discussions in the past to improve the UX for this. Not >>> >> >> >> sure >>> >> >> >> we >>> >> >> >> have a JIRA for it though... >>> >> >> >> >>> >> >> >> Pierre >>> >> >> >> >>> >> >> >> 2018-05-30 8:26 GMT+02:00 Martijn Dekkers >>> >> >> >> <[email protected]>: >>> >> >> >>> >>> >> >> >>> Hi Koji, >>> >> >> >>> >>> >> >> >>> Thank you for responding. I had adjusted the run schedule to >>> >> >> >>> closely >>> >> >> >>> mimic our environment. We are expecting about 1 file per second >>> >> >> >>> or >>> >> >> >>> so. >>> >> >> >>> We are also seeing some random "orphans" sitting in a wait queue >>> >> >> >>> every >>> >> >> >>> now and again that don't trigger a debug message, and when >>> >> >> >>> viewing >>> >> >> >>> the >>> >> >> >>> queue, the dialog states that the queue is empty. >>> >> >> >>> >>> >> >> >>> We found the random "no signal found" issue to be significantly >>> >> >> >>> decreased >>> >> >> >>> when we increase the "concurrent tasks" to something large - >>> >> >> >>> currently >>> >> >> >>> set >>> >> >> >>> to 400 for all wait and notify processors. >>> >> >> >>> >>> >> >> >>> I do need to mention that our requirements had changed since you >>> >> >> >>> made >>> >> >> >>> the >>> >> >> >>> template, in that we are looking for a set of 8 files - 4 x >>> >> >> >>> "ext1" >>> >> >> >>> and >>> >> >> >>> 4 x >>> >> >> >>> "ext2" both with the same pattern: <groupid><type (4 of >>> >> >> >>> these)>.ext1 >>> >> >> >>> or ext2 >>> >> >> >>> >>> >> >> >>> We found that the best way to make this work was to add another >>> >> >> >>> wait/notify pair, each processor coming after the ones already >>> >> >> >>> there, >>> >> >> >>> with a >>> >> >> >>> simple counter against the groupID. >>> >> >> >>> >>> >> >> >>> I will export a template for you, many thanks for your help - I >>> >> >> >>> just >>> >> >> >>> need >>> >> >> >>> to spend some time sanitising the varies fields etc. >>> >> >> >>> >>> >> >> >>> Many thanks once again for your kind assistance. >>> >> >> >>> >>> >> >> >>> Martijn >>> >> >> >>> >>> >> >> >>> On 30 May 2018 at 08:14, Koji Kawamura <[email protected]> >>> >> >> >>> wrote: >>> >> >> >>>> >>> >> >> >>>> Hi Martjin, >>> >> >> >>>> >>> >> >> >>>> In my template, I was using 'Run Schedule' as '5 secs' for the >>> >> >> >>>> Wait >>> >> >> >>>> processors to avoid overusing CPU resource. However, if you >>> >> >> >>>> expect >>> >> >> >>>> more throughput, it should be lowered. >>> >> >> >>>> Changed Run Schedule to 0 sec, and I passed 100 group of files >>> >> >> >>>> (400 >>> >> >> >>>> files because 4 files are 1 set in my example), they reached to >>> >> >> >>>> the >>> >> >> >>>> expected goal of the flow without issue. >>> >> >> >>>> >>> >> >> >>>> If you can share your flow and example input file volume >>> >> >> >>>> (hundreds >>> >> >> >>>> of >>> >> >> >>>> files were fine in my flow), I may be able to provide more >>> >> >> >>>> useful >>> >> >> >>>> comment. >>> >> >> >>>> >>> >> >> >>>> Thanks, >>> >> >> >>>> Koji >>> >> >> >>>> >>> >> >> >>>> On Wed, May 30, 2018 at 2:08 PM, Martijn Dekkers >>> >> >> >>>> <[email protected]> wrote: >>> >> >> >>>> > Hi Koji, >>> >> >> >>>> > >>> >> >> >>>> > I am seeing many issues to get this to run reliably. When >>> >> >> >>>> > running >>> >> >> >>>> > this >>> >> >> >>>> > with >>> >> >> >>>> > a few flowfiles at a time, and stepping through by switching >>> >> >> >>>> > processors on >>> >> >> >>>> > and off it works mostly fine, but running this at volume I >>> >> >> >>>> > receive >>> >> >> >>>> > many >>> >> >> >>>> > errors about "no release signal found" >>> >> >> >>>> > >>> >> >> >>>> > I have tried to fix this in a few different ways, but the >>> >> >> >>>> > issue >>> >> >> >>>> > keeps >>> >> >> >>>> > coming >>> >> >> >>>> > back. This is also not consistent at all - different wait >>> >> >> >>>> > processors >>> >> >> >>>> > will >>> >> >> >>>> > block different flowfiles at different times, without >>> >> >> >>>> > changing >>> >> >> >>>> > any >>> >> >> >>>> > configuration. Stop/Start the flow, and different queues will >>> >> >> >>>> > fill >>> >> >> >>>> > up. >>> >> >> >>>> > Do >>> >> >> >>>> > you have any ideas what could be causing this behavior? I >>> >> >> >>>> > checked >>> >> >> >>>> > the >>> >> >> >>>> > DistributedMapCache Server/Client components, and they all >>> >> >> >>>> > appear >>> >> >> >>>> > to >>> >> >> >>>> > be >>> >> >> >>>> > working OK. >>> >> >> >>>> > >>> >> >> >>>> > Thanks, >>> >> >> >>>> > >>> >> >> >>>> > Martijn >>> >> >> >>>> > >>> >> >> >>>> > On 28 May 2018 at 05:11, Koji Kawamura >>> >> >> >>>> > <[email protected]> >>> >> >> >>>> > wrote: >>> >> >> >>>> >> >>> >> >> >>>> >> Hi Martin, >>> >> >> >>>> >> >>> >> >> >>>> >> Alternative approach is using Wait/Notify processors. >>> >> >> >>>> >> I have developed similar flow using those before, and it >>> >> >> >>>> >> will >>> >> >> >>>> >> work >>> >> >> >>>> >> with your case I believe. >>> >> >> >>>> >> A NiFi flow template is available here. >>> >> >> >>>> >> >>> >> >> >>>> >> >>> >> >> >>>> >> >>> >> >> >>>> >> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd >>> >> >> >>>> >> >>> >> >> >>>> >> Hope this helps, >>> >> >> >>>> >> Koji >>> >> >> >>>> >> >>> >> >> >>>> >> >>> >> >> >>>> >> On Sun, May 27, 2018 at 11:48 PM, Andrew Grande >>> >> >> >>>> >> <[email protected]> >>> >> >> >>>> >> wrote: >>> >> >> >>>> >> > Martijn, >>> >> >> >>>> >> > >>> >> >> >>>> >> > Here's an idea you could explore. Have the ListFile >>> >> >> >>>> >> > processor >>> >> >> >>>> >> > work >>> >> >> >>>> >> > as >>> >> >> >>>> >> > usual >>> >> >> >>>> >> > and create a custom component (start with a scripting one >>> >> >> >>>> >> > to >>> >> >> >>>> >> > prototype) >>> >> >> >>>> >> > grouping the filenames as needed. I don't know of the >>> >> >> >>>> >> > number >>> >> >> >>>> >> > of >>> >> >> >>>> >> > files in >>> >> >> >>>> >> > a >>> >> >> >>>> >> > set is different every time, so trying to be more robust. >>> >> >> >>>> >> > >>> >> >> >>>> >> > Once you group and count the set, you can transfer the >>> >> >> >>>> >> > names >>> >> >> >>>> >> > to >>> >> >> >>>> >> > the >>> >> >> >>>> >> > success >>> >> >> >>>> >> > relationship. Ignore otherwise and wait until the set is >>> >> >> >>>> >> > full. >>> >> >> >>>> >> > >>> >> >> >>>> >> > Andrew >>> >> >> >>>> >> > >>> >> >> >>>> >> > >>> >> >> >>>> >> > On Sun, May 27, 2018, 7:29 AM Martijn Dekkers >>> >> >> >>>> >> > <[email protected]> >>> >> >> >>>> >> > wrote: >>> >> >> >>>> >> >> >>> >> >> >>>> >> >> Hello all, >>> >> >> >>>> >> >> >>> >> >> >>>> >> >> I am trying to work out an issue with little success. >>> >> >> >>>> >> >> >>> >> >> >>>> >> >> I need to ingest files generated by some application. I >>> >> >> >>>> >> >> can >>> >> >> >>>> >> >> only >>> >> >> >>>> >> >> ingest >>> >> >> >>>> >> >> these files when a specific set exists. For example: >>> >> >> >>>> >> >> >>> >> >> >>>> >> >> file_123_456_ab.ex1 >>> >> >> >>>> >> >> file_123_456_cd.ex1 >>> >> >> >>>> >> >> file_123_456_ef.ex1 >>> >> >> >>>> >> >> file_123_456_gh.ex1 >>> >> >> >>>> >> >> file_123_456.ex2 >>> >> >> >>>> >> >> >>> >> >> >>>> >> >> Only when a set like that exists should I pick them up >>> >> >> >>>> >> >> into >>> >> >> >>>> >> >> the >>> >> >> >>>> >> >> Flow. >>> >> >> >>>> >> >> The >>> >> >> >>>> >> >> parts I am looking for to "group" would "ab.ex1", >>> >> >> >>>> >> >> "cd.ex1", >>> >> >> >>>> >> >> "ef.ex1", >>> >> >> >>>> >> >> "gh.ex1", ".ex2". >>> >> >> >>>> >> >> >>> >> >> >>>> >> >> I tried to do this with some expression, but couldn't >>> >> >> >>>> >> >> work >>> >> >> >>>> >> >> it >>> >> >> >>>> >> >> out. >>> >> >> >>>> >> >> >>> >> >> >>>> >> >> What would be the best way to achieve this? >>> >> >> >>>> >> >> >>> >> >> >>>> >> >> Many thanks! >>> >> >> >>>> > >>> >> >> >>>> > >>> >> >> >>> >>> >> >> >>> >>> >> >> >> >>> >> >> > >>> >> > >>> >> > >>> > >>> > >> >>
