HI Martijn,
I used the filename increment pattern based on your first filename example.
file_123_456_ab.ex1
I increment the 456 part. If it changed, that's fine.
Your current configurations look like below:
- Given a filename: file_123_type3.ext1
- matched with a RegEx: ^file_(\d{3}_)(\w+)\.ext1$
- groupID will be: 123_ (including the underscore)
- counterName will be: type3
I was suggesting include the extension to the counterName.
How about changing the RegEx as:
- RegEx: ^file_(\d+)_(\w+\.ext\d)$
- groupID will be: 123
- counterName will be: type3.ext1
Then you can route type1.ext1 to Wait branch and other 7 to Notify.
In Wait branch, you need 7 Wait processors.
It would fast to debug if you can share your flow template..
Thanks,
Koji
On Thu, May 31, 2018 at 3:15 PM, Martijn Dekkers <[email protected]> wrote:
> Thank you Koji,
>
> I have tried once again, using your updated example. Unfortunately, things
> still get stuck at the first Wait processors' wait queue.
> I did notice that the format of the files your example generates is
> different. I will try to clarify:
>
> - 8 files in total:
>
> -- file_123_type1.ext1
> -- file_123_type1.ext2
>
> -- file_123_type2.ext1
> -- file_123_type2.ext2
>
> -- file_123_type3.ext1
> -- file_123_type3.ext2
>
> -- file_123_type4.ext1
> -- file_123_type4.ext2
>
> For each set of 8 files, "file_123" increments, so the first set of 8 is
> "file_123", and the next set is "file_124" and so on.
>
> When I look at your example, I notice that at the final step (LogAttribute
> after the FetchFile set) the filenames are file_123_<incrementing
> number>.ex(1|2)
>
> My UpdateAttribute before the Notify branch is configured as:
> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
> counterName - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$2')}
>
> The UpdateAttribute before the Wait branch is configured as:
> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
>
> The 4 Wait processors in the Wait branch are configured as:
> All Wait processors:
> Release Signal Identifier - ${groupID}
>
> For each individual Wait processor:
> Signal Counter Name - type1
> Signal Counter Name - type2
> Signal Counter Name - type3
> Signal Counter Name - type4
>
> I am a bit stumped. The best success we had was a configuration with a
> RouteonAttribute sending each of type1|type2|type3|type4 to their own Wait
> processor, and a similar config for the Notify branch, followed by a final
> Wait/Notify pair that simply ensures we have the correct amount of sets.
>
> This configuration did exactly what we want, but unfortunately we had random
> flowfiles stuck in the waitqueue for no apparent reason.
>
> Thanks,
>
> Martijn
>
>
>
> On 31 May 2018 at 05:23, Koji Kawamura <[email protected]> wrote:
>>
>> The order of arrival does not matter.
>>
>> Wait processor has 'Expiration Duration' configuration, defaults to 10
>> min. Please adjust it according to your needs, the longest period to
>> wait for a delayed file.
>> If a FlowFile exceeds the duration, it will be sent to 'expired'
>> relationship, and can be treated differently, e.g. write ERROR log
>>
>> > If we have a longer wait for a file, we'd like processing for the next
>> > groupid to still be able to continue.
>>
>> In order to achieve that, use Wait Mode = 'Transfer to wait
>> relationship', and the 'wait' relationship should be configured to use
>> FirstInFirstOutPrioritizer.
>> If FirstInFirstOutPrioritizer is not set, the same FlowFile will be
>> processed again while it blocks other FlowFiles.
>> With FirstInFirstOutPrioritizer, the processed FlowFile will be
>> re-queued at the end of wait queue.
>>
>> I've updated my example to make it more realistic, by adding delay for
>> certain set and type.
>> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
>>
>> Thanks,
>> Koji
>>
>> On Thu, May 31, 2018 at 10:56 AM, Martijn Dekkers
>> <[email protected]> wrote:
>> > Cool, that will make things a lot simpler. Does it matter that the ext2
>> > files arrive in random order? Sometimes there can be a very long delay
>> > in
>> > some of them showing up, and we have some concerns about the overall
>> > flow
>> > blocking. If we have a longer wait for a file, we'd like processing for
>> > the
>> > next groupid to still be able to continue.
>> >
>> > Thank you for your help (and for writing Wait/Notify!!)
>> >
>> > Martijn
>> >
>> > On 31 May 2018 at 03:49, Koji Kawamura <[email protected]> wrote:
>> >>
>> >> Glad to hear that was helpful.
>> >>
>> >> "4 same type for each extension", can be treated as "8 distinct types"
>> >> if an extension is included in a type.
>> >> ab.ex1, cd.ex1, ef.ex1, gh.ex1, ab.ex2, cd.ex2, ef.ex2, gh.ex2
>> >>
>> >> Then route only 'ab.ex1' (or whichever but just 1 of them) to the Wait
>> >> branch, and the rest to Notify branch.
>> >> That will simplify the flow, if I'm not missing any other requirement.
>> >>
>> >> Thanks!
>> >> Koji
>> >>
>> >> On Thu, May 31, 2018 at 10:30 AM, Martijn Dekkers
>> >> <[email protected]> wrote:
>> >> > Hi Koji, Many thanks for your continued assistance!
>> >> >
>> >> >>
>> >> >> - 1 file per second is relatively low in terms of traffic, it should
>> >> >> be processed fine with 1 thread
>> >> >> - A flow like this, which is stateful across different parts of the
>> >> >> flow works at best with single thread, because using multiple
>> >> >> threads
>> >> >> would cause race condition or concurrency issue if there's any
>> >> >> implementation error
>> >> >
>> >> >
>> >> > Yes, we had similar thoughts.
>> >> >
>> >> >>
>> >> >> - Based on above, I strongly recommend to NOT increase "concurrent
>> >> >> tasks". If you see FlowFiles staying in a wait queue, then there
>> >> >> must
>> >> >> be different issue
>> >> >
>> >> >
>> >> > We don't see many flowfiles stuck in a wait queue, I ran a test over
>> >> > a
>> >> > few
>> >> > hours yesterday that simulates the way in which these files would
>> >> > appear
>> >> > (we
>> >> > would have 4 of "ext1" show up every second, and the "ext2" can show
>> >> > up
>> >> > a
>> >> > few seconds later, and not always in the same order) and we found
>> >> > perhaps 6
>> >> > flowfiles stuck in a wait queue.
>> >> >
>> >> >>
>> >> >> - Also, using concurrent tasks number like 400 is too much in
>> >> >> general
>> >> >> for all processors. I recommend to increment it as 2, 3, 4 .. up to
>> >> >> 8
>> >> >> or so, only if you see the clear benefit by doing so
>> >> >
>> >> >
>> >> > Indeed, thanks for the suggestion. Once we have the logic finished
>> >> > and
>> >> > tested we will have to optimise this Flow. The next step is to try to
>> >> > load
>> >> > the required processors into MiNiFy, as this will be running on many
>> >> > systems
>> >> > with limited capacity. If we don't manage with MiNiFy, we will still
>> >> > be
>> >> > good, but we prefer to have the smaller footprint and ease of
>> >> > management
>> >> > we
>> >> > can obtain with MiNiFy.
>> >> >
>> >> >>
>> >> >> - The important part of this flow is extracting 'groupId' and 'type'
>> >> >> from file names. Regular Expression needs to be configured properly.
>> >> >> - I recommend using https://regex101.com/ to test your Regular
>> >> >> Expression to see whether it can extract correct groupId and type
>> >> >
>> >> >
>> >> > Yes, we have tested our RegExes for this extensively
>> >> >
>> >> >>
>> >> >>
>> >> >> Lastly, regardless of how many files should be there for 'ext1' and
>> >> >> 'ext2', the flow structure is simple as below.
>> >> >> Let's say there should be 8 files to start processing those.
>> >> >> 4 x ex1, and 4 ex2 in your case, but let's think it as 8 file types.
>> >> >> And I assume the types are known, meaning, static, not dynamically
>> >> >> change.
>> >> >
>> >> >
>> >> > Correct, the format is <groupID><type>.<ext> where:
>> >> >
>> >> > - groupId is unique for each set of 8
>> >> > - type has 4 variants (ab, cd, ef, gh), the same type-set for each
>> >> > ext
>> >> >
>> >> >> So, the rule is, "a set of files consists of 8 files, and a set
>> >> >> should
>> >> >> wait to be processed until all 8 files are ready", that's all.
>> >> >
>> >> >
>> >> > For our use case it is important that we have positive identification
>> >> > that
>> >> > we have exact "positive identification" of each file.
>> >> >
>> >> >>
>> >> >> Then, the flow should be designed like below:
>> >> >> 1. List files, each file will be sent as a FlowFile
>> >> >
>> >> >
>> >> > Correct - we have several different listfiles for other sections of
>> >> > the
>> >> > flow, we are actually collecting many different sets, all variants of
>> >> > the
>> >> > above. However, those are far simpler (sets of 2 - ext1 and ext2
>> >> > only)
>> >> >
>> >> >>
>> >> >> 2. Extract groupId and type from filename
>> >> >
>> >> >
>> >> > Correct
>> >> >
>> >> >>
>> >> >> 3. Route FlowFiles into two branches, let's call these 'Notify'
>> >> >> branch
>> >> >> and 'Wait' branch, and pass only 1 type for a set to Wait-branch,
>> >> >> and
>> >> >> the rest 7 types to Notify-branch
>> >> >
>> >> >
>> >> > OK, we currently split Notify branch to "all ext1" and wait branch to
>> >> > "all
>> >> > ext2"
>> >> >
>> >> >>
>> >> >> At Notify branch (for the rest 7 types FlowFile, e.g. type 2, 3, 4
>> >> >> ...
>> >> >> 8)
>> >> >
>> >> >
>> >> > As mentioned, we only have 4 distinct types.
>> >> >
>> >> >>
>> >> >> 1. Notify that the type for a group has arrived.
>> >> >> 2. Discard the FlowFile, because there's nothing to do with it in
>> >> >> this
>> >> >> branch
>> >> >
>> >> >
>> >> >
>> >> >>
>> >> >> At Wait branch (for the type 1 FlowFile)
>> >> >> 1. Wait for type 2 for the groupId.
>> >> >> 2. Wait for type 3 for the groupId, type 4, 5 and so on
>> >> >> 3. After passing Wait for type 8, it can guarantee that all 8 files
>> >> >> are available (unless there is any other program deleting those)
>> >> >> 4. Get actual file content using FetchFile, and process it
>> >> >
>> >> >
>> >> > Besides the "4 same types for each extension", this is configured as
>> >> > you
>> >> > describe.
>> >> >
>> >> >>
>> >> >> I hope it helps.
>> >> >>
>> >> >
>> >> > It does, thanks. I will extract this portion of the flow, sanitise,
>> >> > and
>> >> > send
>> >> > it along - easier to see than to describe :)
>> >> >
>> >> >
>> >> >>
>> >> >> Thanks,
>> >> >> Koji
>> >> >
>> >> >
>> >> > Thank you so much once again!
>> >> >
>> >> > Martijn
>> >> >
>> >> >
>> >> >
>> >> >>
>> >> >>
>> >> >> On Wed, May 30, 2018 at 6:10 PM, Martijn Dekkers
>> >> >> <[email protected]>
>> >> >> wrote:
>> >> >> > Hey Pierre,
>> >> >> >
>> >> >> > Yes, we suspected as much, but we are only seeing this with the
>> >> >> > Wait
>> >> >> > processor. Possibly because that is the only "blocking" we have in
>> >> >> > this
>> >> >> > flow.
>> >> >> >
>> >> >> > Thanks for the clarification, much appreciated!
>> >> >> >
>> >> >> > Martijn
>> >> >> >
>> >> >> > On 30 May 2018 at 10:30, Pierre Villard
>> >> >> > <[email protected]>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> I'll let Koji give more information about the Wait/Notify, he is
>> >> >> >> clearly
>> >> >> >> the expert here.
>> >> >> >>
>> >> >> >> I'm just jumping in regarding your "and when viewing the queue,
>> >> >> >> the
>> >> >> >> dialog
>> >> >> >> states that the queue is empty.". You're seeing this behavior
>> >> >> >> because,
>> >> >> >> even
>> >> >> >> though the UI shows some flow files in the queue, the flow files
>> >> >> >> are
>> >> >> >> currently locked in the session of the running processor and you
>> >> >> >> won't
>> >> >> >> see
>> >> >> >> flow files currently processed in a session when listing a queue.
>> >> >> >> If
>> >> >> >> you
>> >> >> >> stop the processor, the session will be closed and you'll be able
>> >> >> >> to
>> >> >> >> list
>> >> >> >> the queue and see the flow files.
>> >> >> >>
>> >> >> >> I recall discussions in the past to improve the UX for this. Not
>> >> >> >> sure
>> >> >> >> we
>> >> >> >> have a JIRA for it though...
>> >> >> >>
>> >> >> >> Pierre
>> >> >> >>
>> >> >> >> 2018-05-30 8:26 GMT+02:00 Martijn Dekkers
>> >> >> >> <[email protected]>:
>> >> >> >>>
>> >> >> >>> Hi Koji,
>> >> >> >>>
>> >> >> >>> Thank you for responding. I had adjusted the run schedule to
>> >> >> >>> closely
>> >> >> >>> mimic our environment. We are expecting about 1 file per second
>> >> >> >>> or
>> >> >> >>> so.
>> >> >> >>> We are also seeing some random "orphans" sitting in a wait queue
>> >> >> >>> every
>> >> >> >>> now and again that don't trigger a debug message, and when
>> >> >> >>> viewing
>> >> >> >>> the
>> >> >> >>> queue, the dialog states that the queue is empty.
>> >> >> >>>
>> >> >> >>> We found the random "no signal found" issue to be significantly
>> >> >> >>> decreased
>> >> >> >>> when we increase the "concurrent tasks" to something large -
>> >> >> >>> currently
>> >> >> >>> set
>> >> >> >>> to 400 for all wait and notify processors.
>> >> >> >>>
>> >> >> >>> I do need to mention that our requirements had changed since you
>> >> >> >>> made
>> >> >> >>> the
>> >> >> >>> template, in that we are looking for a set of 8 files - 4 x
>> >> >> >>> "ext1"
>> >> >> >>> and
>> >> >> >>> 4 x
>> >> >> >>> "ext2" both with the same pattern: <groupid><type (4 of
>> >> >> >>> these)>.ext1
>> >> >> >>> or ext2
>> >> >> >>>
>> >> >> >>> We found that the best way to make this work was to add another
>> >> >> >>> wait/notify pair, each processor coming after the ones already
>> >> >> >>> there,
>> >> >> >>> with a
>> >> >> >>> simple counter against the groupID.
>> >> >> >>>
>> >> >> >>> I will export a template for you, many thanks for your help - I
>> >> >> >>> just
>> >> >> >>> need
>> >> >> >>> to spend some time sanitising the varies fields etc.
>> >> >> >>>
>> >> >> >>> Many thanks once again for your kind assistance.
>> >> >> >>>
>> >> >> >>> Martijn
>> >> >> >>>
>> >> >> >>> On 30 May 2018 at 08:14, Koji Kawamura <[email protected]>
>> >> >> >>> wrote:
>> >> >> >>>>
>> >> >> >>>> Hi Martjin,
>> >> >> >>>>
>> >> >> >>>> In my template, I was using 'Run Schedule' as '5 secs' for the
>> >> >> >>>> Wait
>> >> >> >>>> processors to avoid overusing CPU resource. However, if you
>> >> >> >>>> expect
>> >> >> >>>> more throughput, it should be lowered.
>> >> >> >>>> Changed Run Schedule to 0 sec, and I passed 100 group of files
>> >> >> >>>> (400
>> >> >> >>>> files because 4 files are 1 set in my example), they reached to
>> >> >> >>>> the
>> >> >> >>>> expected goal of the flow without issue.
>> >> >> >>>>
>> >> >> >>>> If you can share your flow and example input file volume
>> >> >> >>>> (hundreds
>> >> >> >>>> of
>> >> >> >>>> files were fine in my flow), I may be able to provide more
>> >> >> >>>> useful
>> >> >> >>>> comment.
>> >> >> >>>>
>> >> >> >>>> Thanks,
>> >> >> >>>> Koji
>> >> >> >>>>
>> >> >> >>>> On Wed, May 30, 2018 at 2:08 PM, Martijn Dekkers
>> >> >> >>>> <[email protected]> wrote:
>> >> >> >>>> > Hi Koji,
>> >> >> >>>> >
>> >> >> >>>> > I am seeing many issues to get this to run reliably. When
>> >> >> >>>> > running
>> >> >> >>>> > this
>> >> >> >>>> > with
>> >> >> >>>> > a few flowfiles at a time, and stepping through by switching
>> >> >> >>>> > processors on
>> >> >> >>>> > and off it works mostly fine, but running this at volume I
>> >> >> >>>> > receive
>> >> >> >>>> > many
>> >> >> >>>> > errors about "no release signal found"
>> >> >> >>>> >
>> >> >> >>>> > I have tried to fix this in a few different ways, but the
>> >> >> >>>> > issue
>> >> >> >>>> > keeps
>> >> >> >>>> > coming
>> >> >> >>>> > back. This is also not consistent at all - different wait
>> >> >> >>>> > processors
>> >> >> >>>> > will
>> >> >> >>>> > block different flowfiles at different times, without
>> >> >> >>>> > changing
>> >> >> >>>> > any
>> >> >> >>>> > configuration. Stop/Start the flow, and different queues will
>> >> >> >>>> > fill
>> >> >> >>>> > up.
>> >> >> >>>> > Do
>> >> >> >>>> > you have any ideas what could be causing this behavior? I
>> >> >> >>>> > checked
>> >> >> >>>> > the
>> >> >> >>>> > DistributedMapCache Server/Client components, and they all
>> >> >> >>>> > appear
>> >> >> >>>> > to
>> >> >> >>>> > be
>> >> >> >>>> > working OK.
>> >> >> >>>> >
>> >> >> >>>> > Thanks,
>> >> >> >>>> >
>> >> >> >>>> > Martijn
>> >> >> >>>> >
>> >> >> >>>> > On 28 May 2018 at 05:11, Koji Kawamura
>> >> >> >>>> > <[email protected]>
>> >> >> >>>> > wrote:
>> >> >> >>>> >>
>> >> >> >>>> >> Hi Martin,
>> >> >> >>>> >>
>> >> >> >>>> >> Alternative approach is using Wait/Notify processors.
>> >> >> >>>> >> I have developed similar flow using those before, and it
>> >> >> >>>> >> will
>> >> >> >>>> >> work
>> >> >> >>>> >> with your case I believe.
>> >> >> >>>> >> A NiFi flow template is available here.
>> >> >> >>>> >>
>> >> >> >>>> >>
>> >> >> >>>> >>
>> >> >> >>>> >> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
>> >> >> >>>> >>
>> >> >> >>>> >> Hope this helps,
>> >> >> >>>> >> Koji
>> >> >> >>>> >>
>> >> >> >>>> >>
>> >> >> >>>> >> On Sun, May 27, 2018 at 11:48 PM, Andrew Grande
>> >> >> >>>> >> <[email protected]>
>> >> >> >>>> >> wrote:
>> >> >> >>>> >> > Martijn,
>> >> >> >>>> >> >
>> >> >> >>>> >> > Here's an idea you could explore. Have the ListFile
>> >> >> >>>> >> > processor
>> >> >> >>>> >> > work
>> >> >> >>>> >> > as
>> >> >> >>>> >> > usual
>> >> >> >>>> >> > and create a custom component (start with a scripting one
>> >> >> >>>> >> > to
>> >> >> >>>> >> > prototype)
>> >> >> >>>> >> > grouping the filenames as needed. I don't know of the
>> >> >> >>>> >> > number
>> >> >> >>>> >> > of
>> >> >> >>>> >> > files in
>> >> >> >>>> >> > a
>> >> >> >>>> >> > set is different every time, so trying to be more robust.
>> >> >> >>>> >> >
>> >> >> >>>> >> > Once you group and count the set, you can transfer the
>> >> >> >>>> >> > names
>> >> >> >>>> >> > to
>> >> >> >>>> >> > the
>> >> >> >>>> >> > success
>> >> >> >>>> >> > relationship. Ignore otherwise and wait until the set is
>> >> >> >>>> >> > full.
>> >> >> >>>> >> >
>> >> >> >>>> >> > Andrew
>> >> >> >>>> >> >
>> >> >> >>>> >> >
>> >> >> >>>> >> > On Sun, May 27, 2018, 7:29 AM Martijn Dekkers
>> >> >> >>>> >> > <[email protected]>
>> >> >> >>>> >> > wrote:
>> >> >> >>>> >> >>
>> >> >> >>>> >> >> Hello all,
>> >> >> >>>> >> >>
>> >> >> >>>> >> >> I am trying to work out an issue with little success.
>> >> >> >>>> >> >>
>> >> >> >>>> >> >> I need to ingest files generated by some application. I
>> >> >> >>>> >> >> can
>> >> >> >>>> >> >> only
>> >> >> >>>> >> >> ingest
>> >> >> >>>> >> >> these files when a specific set exists. For example:
>> >> >> >>>> >> >>
>> >> >> >>>> >> >> file_123_456_ab.ex1
>> >> >> >>>> >> >> file_123_456_cd.ex1
>> >> >> >>>> >> >> file_123_456_ef.ex1
>> >> >> >>>> >> >> file_123_456_gh.ex1
>> >> >> >>>> >> >> file_123_456.ex2
>> >> >> >>>> >> >>
>> >> >> >>>> >> >> Only when a set like that exists should I pick them up
>> >> >> >>>> >> >> into
>> >> >> >>>> >> >> the
>> >> >> >>>> >> >> Flow.
>> >> >> >>>> >> >> The
>> >> >> >>>> >> >> parts I am looking for to "group" would "ab.ex1",
>> >> >> >>>> >> >> "cd.ex1",
>> >> >> >>>> >> >> "ef.ex1",
>> >> >> >>>> >> >> "gh.ex1", ".ex2".
>> >> >> >>>> >> >>
>> >> >> >>>> >> >> I tried to do this with some expression, but couldn't
>> >> >> >>>> >> >> work
>> >> >> >>>> >> >> it
>> >> >> >>>> >> >> out.
>> >> >> >>>> >> >>
>> >> >> >>>> >> >> What would be the best way to achieve this?
>> >> >> >>>> >> >>
>> >> >> >>>> >> >> Many thanks!
>> >> >> >>>> >
>> >> >> >>>> >
>> >> >> >>>
>> >> >> >>>
>> >> >> >>
>> >> >> >
>> >> >
>> >> >
>> >
>> >
>
>