Cool, that will make things a lot simpler. Does it matter that the ext2 files arrive in random order? Sometimes there can be a very long delay in some of them showing up, and we have some concerns about the overall flow blocking. If we have a longer wait for a file, we'd like processing for the next groupid to still be able to continue.
Thank you for your help (and for writing Wait/Notify!!) Martijn On 31 May 2018 at 03:49, Koji Kawamura <[email protected]> wrote: > Glad to hear that was helpful. > > "4 same type for each extension", can be treated as "8 distinct types" > if an extension is included in a type. > ab.ex1, cd.ex1, ef.ex1, gh.ex1, ab.ex2, cd.ex2, ef.ex2, gh.ex2 > > Then route only 'ab.ex1' (or whichever but just 1 of them) to the Wait > branch, and the rest to Notify branch. > That will simplify the flow, if I'm not missing any other requirement. > > Thanks! > Koji > > On Thu, May 31, 2018 at 10:30 AM, Martijn Dekkers > <[email protected]> wrote: > > Hi Koji, Many thanks for your continued assistance! > > > >> > >> - 1 file per second is relatively low in terms of traffic, it should > >> be processed fine with 1 thread > >> - A flow like this, which is stateful across different parts of the > >> flow works at best with single thread, because using multiple threads > >> would cause race condition or concurrency issue if there's any > >> implementation error > > > > > > Yes, we had similar thoughts. > > > >> > >> - Based on above, I strongly recommend to NOT increase "concurrent > >> tasks". If you see FlowFiles staying in a wait queue, then there must > >> be different issue > > > > > > We don't see many flowfiles stuck in a wait queue, I ran a test over a > few > > hours yesterday that simulates the way in which these files would appear > (we > > would have 4 of "ext1" show up every second, and the "ext2" can show up a > > few seconds later, and not always in the same order) and we found > perhaps 6 > > flowfiles stuck in a wait queue. > > > >> > >> - Also, using concurrent tasks number like 400 is too much in general > >> for all processors. I recommend to increment it as 2, 3, 4 .. up to 8 > >> or so, only if you see the clear benefit by doing so > > > > > > Indeed, thanks for the suggestion. Once we have the logic finished and > > tested we will have to optimise this Flow. The next step is to try to > load > > the required processors into MiNiFy, as this will be running on many > systems > > with limited capacity. If we don't manage with MiNiFy, we will still be > > good, but we prefer to have the smaller footprint and ease of management > we > > can obtain with MiNiFy. > > > >> > >> - The important part of this flow is extracting 'groupId' and 'type' > >> from file names. Regular Expression needs to be configured properly. > >> - I recommend using https://regex101.com/ to test your Regular > >> Expression to see whether it can extract correct groupId and type > > > > > > Yes, we have tested our RegExes for this extensively > > > >> > >> > >> Lastly, regardless of how many files should be there for 'ext1' and > >> 'ext2', the flow structure is simple as below. > >> Let's say there should be 8 files to start processing those. > >> 4 x ex1, and 4 ex2 in your case, but let's think it as 8 file types. > >> And I assume the types are known, meaning, static, not dynamically > change. > > > > > > Correct, the format is <groupID><type>.<ext> where: > > > > - groupId is unique for each set of 8 > > - type has 4 variants (ab, cd, ef, gh), the same type-set for each ext > > > >> So, the rule is, "a set of files consists of 8 files, and a set should > >> wait to be processed until all 8 files are ready", that's all. > > > > > > For our use case it is important that we have positive identification > that > > we have exact "positive identification" of each file. > > > >> > >> Then, the flow should be designed like below: > >> 1. List files, each file will be sent as a FlowFile > > > > > > Correct - we have several different listfiles for other sections of the > > flow, we are actually collecting many different sets, all variants of the > > above. However, those are far simpler (sets of 2 - ext1 and ext2 only) > > > >> > >> 2. Extract groupId and type from filename > > > > > > Correct > > > >> > >> 3. Route FlowFiles into two branches, let's call these 'Notify' branch > >> and 'Wait' branch, and pass only 1 type for a set to Wait-branch, and > >> the rest 7 types to Notify-branch > > > > > > OK, we currently split Notify branch to "all ext1" and wait branch to > "all > > ext2" > > > >> > >> At Notify branch (for the rest 7 types FlowFile, e.g. type 2, 3, 4 ... > 8) > > > > > > As mentioned, we only have 4 distinct types. > > > >> > >> 1. Notify that the type for a group has arrived. > >> 2. Discard the FlowFile, because there's nothing to do with it in this > >> branch > > > > > > > >> > >> At Wait branch (for the type 1 FlowFile) > >> 1. Wait for type 2 for the groupId. > >> 2. Wait for type 3 for the groupId, type 4, 5 and so on > >> 3. After passing Wait for type 8, it can guarantee that all 8 files > >> are available (unless there is any other program deleting those) > >> 4. Get actual file content using FetchFile, and process it > > > > > > Besides the "4 same types for each extension", this is configured as you > > describe. > > > >> > >> I hope it helps. > >> > > > > It does, thanks. I will extract this portion of the flow, sanitise, and > send > > it along - easier to see than to describe :) > > > > > >> > >> Thanks, > >> Koji > > > > > > Thank you so much once again! > > > > Martijn > > > > > > > >> > >> > >> On Wed, May 30, 2018 at 6:10 PM, Martijn Dekkers < > [email protected]> > >> wrote: > >> > Hey Pierre, > >> > > >> > Yes, we suspected as much, but we are only seeing this with the Wait > >> > processor. Possibly because that is the only "blocking" we have in > this > >> > flow. > >> > > >> > Thanks for the clarification, much appreciated! > >> > > >> > Martijn > >> > > >> > On 30 May 2018 at 10:30, Pierre Villard <[email protected]> > >> > wrote: > >> >> > >> >> I'll let Koji give more information about the Wait/Notify, he is > >> >> clearly > >> >> the expert here. > >> >> > >> >> I'm just jumping in regarding your "and when viewing the queue, the > >> >> dialog > >> >> states that the queue is empty.". You're seeing this behavior > because, > >> >> even > >> >> though the UI shows some flow files in the queue, the flow files are > >> >> currently locked in the session of the running processor and you > won't > >> >> see > >> >> flow files currently processed in a session when listing a queue. If > >> >> you > >> >> stop the processor, the session will be closed and you'll be able to > >> >> list > >> >> the queue and see the flow files. > >> >> > >> >> I recall discussions in the past to improve the UX for this. Not sure > >> >> we > >> >> have a JIRA for it though... > >> >> > >> >> Pierre > >> >> > >> >> 2018-05-30 8:26 GMT+02:00 Martijn Dekkers <[email protected]>: > >> >>> > >> >>> Hi Koji, > >> >>> > >> >>> Thank you for responding. I had adjusted the run schedule to closely > >> >>> mimic our environment. We are expecting about 1 file per second or > so. > >> >>> We are also seeing some random "orphans" sitting in a wait queue > every > >> >>> now and again that don't trigger a debug message, and when viewing > the > >> >>> queue, the dialog states that the queue is empty. > >> >>> > >> >>> We found the random "no signal found" issue to be significantly > >> >>> decreased > >> >>> when we increase the "concurrent tasks" to something large - > currently > >> >>> set > >> >>> to 400 for all wait and notify processors. > >> >>> > >> >>> I do need to mention that our requirements had changed since you > made > >> >>> the > >> >>> template, in that we are looking for a set of 8 files - 4 x "ext1" > and > >> >>> 4 x > >> >>> "ext2" both with the same pattern: <groupid><type (4 of these)>.ext1 > >> >>> or ext2 > >> >>> > >> >>> We found that the best way to make this work was to add another > >> >>> wait/notify pair, each processor coming after the ones already > there, > >> >>> with a > >> >>> simple counter against the groupID. > >> >>> > >> >>> I will export a template for you, many thanks for your help - I just > >> >>> need > >> >>> to spend some time sanitising the varies fields etc. > >> >>> > >> >>> Many thanks once again for your kind assistance. > >> >>> > >> >>> Martijn > >> >>> > >> >>> On 30 May 2018 at 08:14, Koji Kawamura <[email protected]> > wrote: > >> >>>> > >> >>>> Hi Martjin, > >> >>>> > >> >>>> In my template, I was using 'Run Schedule' as '5 secs' for the Wait > >> >>>> processors to avoid overusing CPU resource. However, if you expect > >> >>>> more throughput, it should be lowered. > >> >>>> Changed Run Schedule to 0 sec, and I passed 100 group of files (400 > >> >>>> files because 4 files are 1 set in my example), they reached to the > >> >>>> expected goal of the flow without issue. > >> >>>> > >> >>>> If you can share your flow and example input file volume (hundreds > of > >> >>>> files were fine in my flow), I may be able to provide more useful > >> >>>> comment. > >> >>>> > >> >>>> Thanks, > >> >>>> Koji > >> >>>> > >> >>>> On Wed, May 30, 2018 at 2:08 PM, Martijn Dekkers > >> >>>> <[email protected]> wrote: > >> >>>> > Hi Koji, > >> >>>> > > >> >>>> > I am seeing many issues to get this to run reliably. When running > >> >>>> > this > >> >>>> > with > >> >>>> > a few flowfiles at a time, and stepping through by switching > >> >>>> > processors on > >> >>>> > and off it works mostly fine, but running this at volume I > receive > >> >>>> > many > >> >>>> > errors about "no release signal found" > >> >>>> > > >> >>>> > I have tried to fix this in a few different ways, but the issue > >> >>>> > keeps > >> >>>> > coming > >> >>>> > back. This is also not consistent at all - different wait > >> >>>> > processors > >> >>>> > will > >> >>>> > block different flowfiles at different times, without changing > any > >> >>>> > configuration. Stop/Start the flow, and different queues will > fill > >> >>>> > up. > >> >>>> > Do > >> >>>> > you have any ideas what could be causing this behavior? I checked > >> >>>> > the > >> >>>> > DistributedMapCache Server/Client components, and they all appear > >> >>>> > to > >> >>>> > be > >> >>>> > working OK. > >> >>>> > > >> >>>> > Thanks, > >> >>>> > > >> >>>> > Martijn > >> >>>> > > >> >>>> > On 28 May 2018 at 05:11, Koji Kawamura <[email protected]> > >> >>>> > wrote: > >> >>>> >> > >> >>>> >> Hi Martin, > >> >>>> >> > >> >>>> >> Alternative approach is using Wait/Notify processors. > >> >>>> >> I have developed similar flow using those before, and it will > work > >> >>>> >> with your case I believe. > >> >>>> >> A NiFi flow template is available here. > >> >>>> >> > >> >>>> >> https://gist.github.com/ijokarumawak/ > 06b3b071eeb4d10d8a27507981422edd > >> >>>> >> > >> >>>> >> Hope this helps, > >> >>>> >> Koji > >> >>>> >> > >> >>>> >> > >> >>>> >> On Sun, May 27, 2018 at 11:48 PM, Andrew Grande > >> >>>> >> <[email protected]> > >> >>>> >> wrote: > >> >>>> >> > Martijn, > >> >>>> >> > > >> >>>> >> > Here's an idea you could explore. Have the ListFile processor > >> >>>> >> > work > >> >>>> >> > as > >> >>>> >> > usual > >> >>>> >> > and create a custom component (start with a scripting one to > >> >>>> >> > prototype) > >> >>>> >> > grouping the filenames as needed. I don't know of the number > of > >> >>>> >> > files in > >> >>>> >> > a > >> >>>> >> > set is different every time, so trying to be more robust. > >> >>>> >> > > >> >>>> >> > Once you group and count the set, you can transfer the names > to > >> >>>> >> > the > >> >>>> >> > success > >> >>>> >> > relationship. Ignore otherwise and wait until the set is full. > >> >>>> >> > > >> >>>> >> > Andrew > >> >>>> >> > > >> >>>> >> > > >> >>>> >> > On Sun, May 27, 2018, 7:29 AM Martijn Dekkers > >> >>>> >> > <[email protected]> > >> >>>> >> > wrote: > >> >>>> >> >> > >> >>>> >> >> Hello all, > >> >>>> >> >> > >> >>>> >> >> I am trying to work out an issue with little success. > >> >>>> >> >> > >> >>>> >> >> I need to ingest files generated by some application. I can > >> >>>> >> >> only > >> >>>> >> >> ingest > >> >>>> >> >> these files when a specific set exists. For example: > >> >>>> >> >> > >> >>>> >> >> file_123_456_ab.ex1 > >> >>>> >> >> file_123_456_cd.ex1 > >> >>>> >> >> file_123_456_ef.ex1 > >> >>>> >> >> file_123_456_gh.ex1 > >> >>>> >> >> file_123_456.ex2 > >> >>>> >> >> > >> >>>> >> >> Only when a set like that exists should I pick them up into > the > >> >>>> >> >> Flow. > >> >>>> >> >> The > >> >>>> >> >> parts I am looking for to "group" would "ab.ex1", "cd.ex1", > >> >>>> >> >> "ef.ex1", > >> >>>> >> >> "gh.ex1", ".ex2". > >> >>>> >> >> > >> >>>> >> >> I tried to do this with some expression, but couldn't work it > >> >>>> >> >> out. > >> >>>> >> >> > >> >>>> >> >> What would be the best way to achieve this? > >> >>>> >> >> > >> >>>> >> >> Many thanks! > >> >>>> > > >> >>>> > > >> >>> > >> >>> > >> >> > >> > > > > > >
