Hi Martijn,

Thanks for elaborating your requirement.  Here are few comments:

- 1 file per second is relatively low in terms of traffic, it should
be processed fine with 1 thread
- A flow like this, which is stateful across different parts of the
flow works at best with single thread, because using multiple threads
would cause race condition or concurrency issue if there's any
implementation error
- Based on above, I strongly recommend to NOT increase "concurrent
tasks". If you see FlowFiles staying in a wait queue, then there must
be different issue
- Also, using concurrent tasks number like 400 is too much in general
for all processors. I recommend to increment it as 2, 3, 4 .. up to 8
or so, only if you see the clear benefit by doing so
- The important part of this flow is extracting 'groupId' and 'type'
from file names. Regular Expression needs to be configured properly.
- I recommend using https://regex101.com/ to test your Regular
Expression to see whether it can extract correct groupId and type

Lastly, regardless of how many files should be there for 'ext1' and
'ext2', the flow structure is simple as below.
Let's say there should be 8 files to start processing those.
4 x ex1, and 4 ex2 in your case, but let's think it as 8 file types.
And I assume the types are known, meaning, static, not dynamically change.
So, the rule is, "a set of files consists of 8 files, and a set should
wait to be processed until all 8 files are ready", that's all.

Then, the flow should be designed like below:
1. List files, each file will be sent as a FlowFile
2. Extract groupId and type from filename
3. Route FlowFiles into two branches, let's call these 'Notify' branch
and 'Wait' branch, and pass only 1 type for a set to Wait-branch, and
the rest 7 types to Notify-branch

At Notify branch (for the rest 7 types FlowFile, e.g. type 2, 3, 4 ... 8)
1. Notify that the type for a group has arrived.
2. Discard the FlowFile, because there's nothing to do with it in this branch

At Wait branch (for the type 1 FlowFile)
1. Wait for type 2 for the groupId.
2. Wait for type 3 for the groupId, type 4, 5 and so on
3. After passing Wait for type 8, it can guarantee that all 8 files
are available (unless there is any other program deleting those)
4. Get actual file content using FetchFile, and process it

I hope it helps.

Thanks,
Koji


On Wed, May 30, 2018 at 6:10 PM, Martijn Dekkers <mart...@dekkers.org.uk> wrote:
> Hey Pierre,
>
> Yes, we suspected as much, but we are only seeing this with the Wait
> processor. Possibly because that is the only "blocking" we have in this
> flow.
>
> Thanks for the clarification, much appreciated!
>
> Martijn
>
> On 30 May 2018 at 10:30, Pierre Villard <pierre.villard...@gmail.com> wrote:
>>
>> I'll let Koji give more information about the Wait/Notify, he is clearly
>> the expert here.
>>
>> I'm just jumping in regarding your "and when viewing the queue, the dialog
>> states that the queue is empty.". You're seeing this behavior because, even
>> though the UI shows some flow files in the queue, the flow files are
>> currently locked in the session of the running processor and you won't see
>> flow files currently processed in a session when listing a queue. If you
>> stop the processor, the session will be closed and you'll be able to list
>> the queue and see the flow files.
>>
>> I recall discussions in the past to improve the UX for this. Not sure we
>> have a JIRA for it though...
>>
>> Pierre
>>
>> 2018-05-30 8:26 GMT+02:00 Martijn Dekkers <mart...@dekkers.org.uk>:
>>>
>>> Hi Koji,
>>>
>>> Thank you for responding. I had adjusted the run schedule to closely
>>> mimic our environment. We are expecting about 1 file per second or so.
>>> We are also seeing some random "orphans" sitting in a wait queue every
>>> now and again that don't trigger a debug message, and when viewing the
>>> queue, the dialog states that the queue is empty.
>>>
>>> We found the random "no signal found" issue to be significantly decreased
>>> when we increase the "concurrent tasks" to something large - currently set
>>> to 400 for all wait and notify processors.
>>>
>>> I do need to mention that our requirements had changed since you made the
>>> template, in that we are looking for a set of 8 files - 4 x "ext1" and 4 x
>>> "ext2" both with the same pattern: <groupid><type (4 of these)>.ext1 or ext2
>>>
>>> We found that the best way to make this work was to add another
>>> wait/notify pair, each processor coming after the ones already there, with a
>>> simple counter against the groupID.
>>>
>>> I will export a template for you, many thanks for your help - I just need
>>> to spend some time sanitising the varies fields etc.
>>>
>>> Many thanks once again for your kind assistance.
>>>
>>> Martijn
>>>
>>> On 30 May 2018 at 08:14, Koji Kawamura <ijokaruma...@gmail.com> wrote:
>>>>
>>>> Hi Martjin,
>>>>
>>>> In my template, I was using 'Run Schedule' as '5 secs' for the Wait
>>>> processors to avoid overusing CPU resource. However, if you expect
>>>> more throughput, it should be lowered.
>>>> Changed Run Schedule to 0 sec, and I passed 100 group of files (400
>>>> files because 4 files are 1 set in my example), they reached to the
>>>> expected goal of the flow without issue.
>>>>
>>>> If you can share your flow and example input file volume (hundreds of
>>>> files were fine in my flow), I may be able to provide more useful
>>>> comment.
>>>>
>>>> Thanks,
>>>> Koji
>>>>
>>>> On Wed, May 30, 2018 at 2:08 PM, Martijn Dekkers
>>>> <mart...@dekkers.org.uk> wrote:
>>>> > Hi Koji,
>>>> >
>>>> > I am seeing many issues to get this to run reliably. When running this
>>>> > with
>>>> > a few flowfiles at a time, and stepping through by switching
>>>> > processors on
>>>> > and off it works mostly fine, but running this at volume I receive
>>>> > many
>>>> > errors about "no release signal found"
>>>> >
>>>> > I have tried to fix this in a few different ways, but the issue keeps
>>>> > coming
>>>> > back. This is also not consistent at all - different wait processors
>>>> > will
>>>> > block different flowfiles at different times, without changing any
>>>> > configuration. Stop/Start the flow, and different queues will fill up.
>>>> > Do
>>>> > you have any ideas what could be causing this behavior? I checked the
>>>> > DistributedMapCache Server/Client components, and they all appear to
>>>> > be
>>>> > working OK.
>>>> >
>>>> > Thanks,
>>>> >
>>>> > Martijn
>>>> >
>>>> > On 28 May 2018 at 05:11, Koji Kawamura <ijokaruma...@gmail.com> wrote:
>>>> >>
>>>> >> Hi Martin,
>>>> >>
>>>> >> Alternative approach is using Wait/Notify processors.
>>>> >> I have developed similar flow using those before, and it will work
>>>> >> with your case I believe.
>>>> >> A NiFi flow template is available here.
>>>> >> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
>>>> >>
>>>> >> Hope this helps,
>>>> >> Koji
>>>> >>
>>>> >>
>>>> >> On Sun, May 27, 2018 at 11:48 PM, Andrew Grande <apere...@gmail.com>
>>>> >> wrote:
>>>> >> > Martijn,
>>>> >> >
>>>> >> > Here's an idea you could explore. Have the ListFile processor work
>>>> >> > as
>>>> >> > usual
>>>> >> > and create a custom component (start with a scripting one to
>>>> >> > prototype)
>>>> >> > grouping the filenames as needed. I don't know of the number of
>>>> >> > files in
>>>> >> > a
>>>> >> > set is different every time, so trying to be more robust.
>>>> >> >
>>>> >> > Once you group and count the set, you can transfer the names to the
>>>> >> > success
>>>> >> > relationship. Ignore otherwise and wait until the set is full.
>>>> >> >
>>>> >> > Andrew
>>>> >> >
>>>> >> >
>>>> >> > On Sun, May 27, 2018, 7:29 AM Martijn Dekkers
>>>> >> > <mart...@dekkers.org.uk>
>>>> >> > wrote:
>>>> >> >>
>>>> >> >> Hello all,
>>>> >> >>
>>>> >> >> I am trying to work out an issue with little success.
>>>> >> >>
>>>> >> >> I need to ingest files generated by some application. I can only
>>>> >> >> ingest
>>>> >> >> these files when a specific set exists. For example:
>>>> >> >>
>>>> >> >> file_123_456_ab.ex1
>>>> >> >> file_123_456_cd.ex1
>>>> >> >> file_123_456_ef.ex1
>>>> >> >> file_123_456_gh.ex1
>>>> >> >> file_123_456.ex2
>>>> >> >>
>>>> >> >> Only when a set like that exists should I pick them up into the
>>>> >> >> Flow.
>>>> >> >> The
>>>> >> >> parts I am looking for to "group" would "ab.ex1", "cd.ex1",
>>>> >> >> "ef.ex1",
>>>> >> >> "gh.ex1", ".ex2".
>>>> >> >>
>>>> >> >> I tried to do this with some expression, but couldn't work it out.
>>>> >> >>
>>>> >> >> What would be the best way to achieve this?
>>>> >> >>
>>>> >> >> Many thanks!
>>>> >
>>>> >
>>>
>>>
>>
>

Reply via email to