The order of arrival does not matter.

Wait processor has 'Expiration Duration' configuration, defaults to 10
min. Please adjust it according to your needs, the longest period to
wait for a delayed file.
If a FlowFile exceeds the duration, it will be sent to 'expired'
relationship, and can be treated differently, e.g. write ERROR log

> If we have a longer wait for a file, we'd like processing for the next 
> groupid to still be able to continue.

In order to achieve that, use Wait Mode = 'Transfer to wait
relationship', and the 'wait' relationship should be configured to use
FirstInFirstOutPrioritizer.
If FirstInFirstOutPrioritizer is not set, the same FlowFile will be
processed again while it blocks other FlowFiles.
With FirstInFirstOutPrioritizer, the processed FlowFile will be
re-queued at the end of wait queue.

I've updated my example to make it more realistic, by adding delay for
certain set and type.
https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd

Thanks,
Koji

On Thu, May 31, 2018 at 10:56 AM, Martijn Dekkers
<[email protected]> wrote:
> Cool, that will make things a lot simpler. Does it matter that the ext2
> files arrive in random order? Sometimes there can be a very long delay in
> some of them showing up, and we have some concerns about the overall flow
> blocking. If we have a longer wait for a file, we'd like processing for the
> next groupid to still be able to continue.
>
> Thank you for your help (and for writing Wait/Notify!!)
>
> Martijn
>
> On 31 May 2018 at 03:49, Koji Kawamura <[email protected]> wrote:
>>
>> Glad to hear that was helpful.
>>
>> "4 same type for each extension", can be treated as "8 distinct types"
>> if an extension is included in a type.
>> ab.ex1, cd.ex1, ef.ex1, gh.ex1, ab.ex2, cd.ex2, ef.ex2, gh.ex2
>>
>> Then route only 'ab.ex1' (or whichever but just 1 of them) to the Wait
>> branch, and the rest to Notify branch.
>> That will simplify the flow, if I'm not missing any other requirement.
>>
>> Thanks!
>> Koji
>>
>> On Thu, May 31, 2018 at 10:30 AM, Martijn Dekkers
>> <[email protected]> wrote:
>> > Hi Koji, Many thanks for your continued assistance!
>> >
>> >>
>> >> - 1 file per second is relatively low in terms of traffic, it should
>> >> be processed fine with 1 thread
>> >> - A flow like this, which is stateful across different parts of the
>> >> flow works at best with single thread, because using multiple threads
>> >> would cause race condition or concurrency issue if there's any
>> >> implementation error
>> >
>> >
>> > Yes, we had similar thoughts.
>> >
>> >>
>> >> - Based on above, I strongly recommend to NOT increase "concurrent
>> >> tasks". If you see FlowFiles staying in a wait queue, then there must
>> >> be different issue
>> >
>> >
>> > We don't see many flowfiles stuck in a wait queue, I ran a test over a
>> > few
>> > hours yesterday that simulates the way in which these files would appear
>> > (we
>> > would have 4 of "ext1" show up every second, and the "ext2" can show up
>> > a
>> > few seconds later, and not always in the same order) and we found
>> > perhaps 6
>> > flowfiles stuck in a wait queue.
>> >
>> >>
>> >> - Also, using concurrent tasks number like 400 is too much in general
>> >> for all processors. I recommend to increment it as 2, 3, 4 .. up to 8
>> >> or so, only if you see the clear benefit by doing so
>> >
>> >
>> > Indeed, thanks for the suggestion. Once we have the logic finished and
>> > tested we will have to optimise this Flow. The next step is to try to
>> > load
>> > the required processors into MiNiFy, as this will be running on many
>> > systems
>> > with limited capacity. If we don't manage with MiNiFy, we will still be
>> > good, but we prefer to have the smaller footprint and ease of management
>> > we
>> > can obtain with MiNiFy.
>> >
>> >>
>> >> - The important part of this flow is extracting 'groupId' and 'type'
>> >> from file names. Regular Expression needs to be configured properly.
>> >> - I recommend using https://regex101.com/ to test your Regular
>> >> Expression to see whether it can extract correct groupId and type
>> >
>> >
>> > Yes, we have tested our RegExes for this extensively
>> >
>> >>
>> >>
>> >> Lastly, regardless of how many files should be there for 'ext1' and
>> >> 'ext2', the flow structure is simple as below.
>> >> Let's say there should be 8 files to start processing those.
>> >> 4 x ex1, and 4 ex2 in your case, but let's think it as 8 file types.
>> >> And I assume the types are known, meaning, static, not dynamically
>> >> change.
>> >
>> >
>> > Correct, the format is <groupID><type>.<ext> where:
>> >
>> > - groupId is unique for each set of 8
>> > - type has 4 variants (ab, cd, ef, gh), the same type-set for each ext
>> >
>> >> So, the rule is, "a set of files consists of 8 files, and a set should
>> >> wait to be processed until all 8 files are ready", that's all.
>> >
>> >
>> > For our use case it is important that we have positive identification
>> > that
>> > we have exact "positive identification" of each file.
>> >
>> >>
>> >> Then, the flow should be designed like below:
>> >> 1. List files, each file will be sent as a FlowFile
>> >
>> >
>> > Correct - we have several different listfiles for other sections of the
>> > flow, we are actually collecting many different sets, all variants of
>> > the
>> > above. However, those are far simpler (sets of 2 - ext1 and ext2 only)
>> >
>> >>
>> >> 2. Extract groupId and type from filename
>> >
>> >
>> > Correct
>> >
>> >>
>> >> 3. Route FlowFiles into two branches, let's call these 'Notify' branch
>> >> and 'Wait' branch, and pass only 1 type for a set to Wait-branch, and
>> >> the rest 7 types to Notify-branch
>> >
>> >
>> > OK, we currently split Notify branch to "all ext1" and wait branch to
>> > "all
>> > ext2"
>> >
>> >>
>> >> At Notify branch (for the rest 7 types FlowFile, e.g. type 2, 3, 4 ...
>> >> 8)
>> >
>> >
>> > As mentioned, we only have 4 distinct types.
>> >
>> >>
>> >> 1. Notify that the type for a group has arrived.
>> >> 2. Discard the FlowFile, because there's nothing to do with it in this
>> >> branch
>> >
>> >
>> >
>> >>
>> >> At Wait branch (for the type 1 FlowFile)
>> >> 1. Wait for type 2 for the groupId.
>> >> 2. Wait for type 3 for the groupId, type 4, 5 and so on
>> >> 3. After passing Wait for type 8, it can guarantee that all 8 files
>> >> are available (unless there is any other program deleting those)
>> >> 4. Get actual file content using FetchFile, and process it
>> >
>> >
>> > Besides the "4 same types for each extension", this is configured as you
>> > describe.
>> >
>> >>
>> >> I hope it helps.
>> >>
>> >
>> > It does, thanks. I will extract this portion of the flow, sanitise, and
>> > send
>> > it along - easier to see than to describe :)
>> >
>> >
>> >>
>> >> Thanks,
>> >> Koji
>> >
>> >
>> > Thank you so much once again!
>> >
>> > Martijn
>> >
>> >
>> >
>> >>
>> >>
>> >> On Wed, May 30, 2018 at 6:10 PM, Martijn Dekkers
>> >> <[email protected]>
>> >> wrote:
>> >> > Hey Pierre,
>> >> >
>> >> > Yes, we suspected as much, but we are only seeing this with the Wait
>> >> > processor. Possibly because that is the only "blocking" we have in
>> >> > this
>> >> > flow.
>> >> >
>> >> > Thanks for the clarification, much appreciated!
>> >> >
>> >> > Martijn
>> >> >
>> >> > On 30 May 2018 at 10:30, Pierre Villard <[email protected]>
>> >> > wrote:
>> >> >>
>> >> >> I'll let Koji give more information about the Wait/Notify, he is
>> >> >> clearly
>> >> >> the expert here.
>> >> >>
>> >> >> I'm just jumping in regarding your "and when viewing the queue, the
>> >> >> dialog
>> >> >> states that the queue is empty.". You're seeing this behavior
>> >> >> because,
>> >> >> even
>> >> >> though the UI shows some flow files in the queue, the flow files are
>> >> >> currently locked in the session of the running processor and you
>> >> >> won't
>> >> >> see
>> >> >> flow files currently processed in a session when listing a queue. If
>> >> >> you
>> >> >> stop the processor, the session will be closed and you'll be able to
>> >> >> list
>> >> >> the queue and see the flow files.
>> >> >>
>> >> >> I recall discussions in the past to improve the UX for this. Not
>> >> >> sure
>> >> >> we
>> >> >> have a JIRA for it though...
>> >> >>
>> >> >> Pierre
>> >> >>
>> >> >> 2018-05-30 8:26 GMT+02:00 Martijn Dekkers <[email protected]>:
>> >> >>>
>> >> >>> Hi Koji,
>> >> >>>
>> >> >>> Thank you for responding. I had adjusted the run schedule to
>> >> >>> closely
>> >> >>> mimic our environment. We are expecting about 1 file per second or
>> >> >>> so.
>> >> >>> We are also seeing some random "orphans" sitting in a wait queue
>> >> >>> every
>> >> >>> now and again that don't trigger a debug message, and when viewing
>> >> >>> the
>> >> >>> queue, the dialog states that the queue is empty.
>> >> >>>
>> >> >>> We found the random "no signal found" issue to be significantly
>> >> >>> decreased
>> >> >>> when we increase the "concurrent tasks" to something large -
>> >> >>> currently
>> >> >>> set
>> >> >>> to 400 for all wait and notify processors.
>> >> >>>
>> >> >>> I do need to mention that our requirements had changed since you
>> >> >>> made
>> >> >>> the
>> >> >>> template, in that we are looking for a set of 8 files - 4 x "ext1"
>> >> >>> and
>> >> >>> 4 x
>> >> >>> "ext2" both with the same pattern: <groupid><type (4 of
>> >> >>> these)>.ext1
>> >> >>> or ext2
>> >> >>>
>> >> >>> We found that the best way to make this work was to add another
>> >> >>> wait/notify pair, each processor coming after the ones already
>> >> >>> there,
>> >> >>> with a
>> >> >>> simple counter against the groupID.
>> >> >>>
>> >> >>> I will export a template for you, many thanks for your help - I
>> >> >>> just
>> >> >>> need
>> >> >>> to spend some time sanitising the varies fields etc.
>> >> >>>
>> >> >>> Many thanks once again for your kind assistance.
>> >> >>>
>> >> >>> Martijn
>> >> >>>
>> >> >>> On 30 May 2018 at 08:14, Koji Kawamura <[email protected]>
>> >> >>> wrote:
>> >> >>>>
>> >> >>>> Hi Martjin,
>> >> >>>>
>> >> >>>> In my template, I was using 'Run Schedule' as '5 secs' for the
>> >> >>>> Wait
>> >> >>>> processors to avoid overusing CPU resource. However, if you expect
>> >> >>>> more throughput, it should be lowered.
>> >> >>>> Changed Run Schedule to 0 sec, and I passed 100 group of files
>> >> >>>> (400
>> >> >>>> files because 4 files are 1 set in my example), they reached to
>> >> >>>> the
>> >> >>>> expected goal of the flow without issue.
>> >> >>>>
>> >> >>>> If you can share your flow and example input file volume (hundreds
>> >> >>>> of
>> >> >>>> files were fine in my flow), I may be able to provide more useful
>> >> >>>> comment.
>> >> >>>>
>> >> >>>> Thanks,
>> >> >>>> Koji
>> >> >>>>
>> >> >>>> On Wed, May 30, 2018 at 2:08 PM, Martijn Dekkers
>> >> >>>> <[email protected]> wrote:
>> >> >>>> > Hi Koji,
>> >> >>>> >
>> >> >>>> > I am seeing many issues to get this to run reliably. When
>> >> >>>> > running
>> >> >>>> > this
>> >> >>>> > with
>> >> >>>> > a few flowfiles at a time, and stepping through by switching
>> >> >>>> > processors on
>> >> >>>> > and off it works mostly fine, but running this at volume I
>> >> >>>> > receive
>> >> >>>> > many
>> >> >>>> > errors about "no release signal found"
>> >> >>>> >
>> >> >>>> > I have tried to fix this in a few different ways, but the issue
>> >> >>>> > keeps
>> >> >>>> > coming
>> >> >>>> > back. This is also not consistent at all - different wait
>> >> >>>> > processors
>> >> >>>> > will
>> >> >>>> > block different flowfiles at different times, without changing
>> >> >>>> > any
>> >> >>>> > configuration. Stop/Start the flow, and different queues will
>> >> >>>> > fill
>> >> >>>> > up.
>> >> >>>> > Do
>> >> >>>> > you have any ideas what could be causing this behavior? I
>> >> >>>> > checked
>> >> >>>> > the
>> >> >>>> > DistributedMapCache Server/Client components, and they all
>> >> >>>> > appear
>> >> >>>> > to
>> >> >>>> > be
>> >> >>>> > working OK.
>> >> >>>> >
>> >> >>>> > Thanks,
>> >> >>>> >
>> >> >>>> > Martijn
>> >> >>>> >
>> >> >>>> > On 28 May 2018 at 05:11, Koji Kawamura <[email protected]>
>> >> >>>> > wrote:
>> >> >>>> >>
>> >> >>>> >> Hi Martin,
>> >> >>>> >>
>> >> >>>> >> Alternative approach is using Wait/Notify processors.
>> >> >>>> >> I have developed similar flow using those before, and it will
>> >> >>>> >> work
>> >> >>>> >> with your case I believe.
>> >> >>>> >> A NiFi flow template is available here.
>> >> >>>> >>
>> >> >>>> >>
>> >> >>>> >> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
>> >> >>>> >>
>> >> >>>> >> Hope this helps,
>> >> >>>> >> Koji
>> >> >>>> >>
>> >> >>>> >>
>> >> >>>> >> On Sun, May 27, 2018 at 11:48 PM, Andrew Grande
>> >> >>>> >> <[email protected]>
>> >> >>>> >> wrote:
>> >> >>>> >> > Martijn,
>> >> >>>> >> >
>> >> >>>> >> > Here's an idea you could explore. Have the ListFile processor
>> >> >>>> >> > work
>> >> >>>> >> > as
>> >> >>>> >> > usual
>> >> >>>> >> > and create a custom component (start with a scripting one to
>> >> >>>> >> > prototype)
>> >> >>>> >> > grouping the filenames as needed. I don't know of the number
>> >> >>>> >> > of
>> >> >>>> >> > files in
>> >> >>>> >> > a
>> >> >>>> >> > set is different every time, so trying to be more robust.
>> >> >>>> >> >
>> >> >>>> >> > Once you group and count the set, you can transfer the names
>> >> >>>> >> > to
>> >> >>>> >> > the
>> >> >>>> >> > success
>> >> >>>> >> > relationship. Ignore otherwise and wait until the set is
>> >> >>>> >> > full.
>> >> >>>> >> >
>> >> >>>> >> > Andrew
>> >> >>>> >> >
>> >> >>>> >> >
>> >> >>>> >> > On Sun, May 27, 2018, 7:29 AM Martijn Dekkers
>> >> >>>> >> > <[email protected]>
>> >> >>>> >> > wrote:
>> >> >>>> >> >>
>> >> >>>> >> >> Hello all,
>> >> >>>> >> >>
>> >> >>>> >> >> I am trying to work out an issue with little success.
>> >> >>>> >> >>
>> >> >>>> >> >> I need to ingest files generated by some application. I can
>> >> >>>> >> >> only
>> >> >>>> >> >> ingest
>> >> >>>> >> >> these files when a specific set exists. For example:
>> >> >>>> >> >>
>> >> >>>> >> >> file_123_456_ab.ex1
>> >> >>>> >> >> file_123_456_cd.ex1
>> >> >>>> >> >> file_123_456_ef.ex1
>> >> >>>> >> >> file_123_456_gh.ex1
>> >> >>>> >> >> file_123_456.ex2
>> >> >>>> >> >>
>> >> >>>> >> >> Only when a set like that exists should I pick them up into
>> >> >>>> >> >> the
>> >> >>>> >> >> Flow.
>> >> >>>> >> >> The
>> >> >>>> >> >> parts I am looking for to "group" would "ab.ex1", "cd.ex1",
>> >> >>>> >> >> "ef.ex1",
>> >> >>>> >> >> "gh.ex1", ".ex2".
>> >> >>>> >> >>
>> >> >>>> >> >> I tried to do this with some expression, but couldn't work
>> >> >>>> >> >> it
>> >> >>>> >> >> out.
>> >> >>>> >> >>
>> >> >>>> >> >> What would be the best way to achieve this?
>> >> >>>> >> >>
>> >> >>>> >> >> Many thanks!
>> >> >>>> >
>> >> >>>> >
>> >> >>>
>> >> >>>
>> >> >>
>> >> >
>> >
>> >
>
>

Reply via email to