BTW, which version are you using? I hope it is 1.4.0 or higher. There
was an issue having effects to your usage.
https://issues.apache.org/jira/browse/NIFI-4028

On Thu, May 31, 2018 at 4:51 PM, Koji Kawamura <[email protected]> wrote:
> HI Martijn,
>
> I used the filename increment pattern based on your first filename example.
> file_123_456_ab.ex1
> I increment the 456 part. If it changed, that's fine.
>
> Your current configurations look like below:
> - Given a filename: file_123_type3.ext1
> - matched with a RegEx: ^file_(\d{3}_)(\w+)\.ext1$
> - groupID will be: 123_ (including the underscore)
> - counterName will be: type3
>
> I was suggesting include the extension to the counterName.
> How about changing the RegEx as:
> - RegEx: ^file_(\d+)_(\w+\.ext\d)$
> - groupID will be: 123
> - counterName will be: type3.ext1
>
> Then you can route type1.ext1 to Wait branch and other 7 to Notify.
> In Wait branch, you need 7 Wait processors.
>
> It would fast to debug if you can share your flow template..
>
> Thanks,
> Koji
>
> On Thu, May 31, 2018 at 3:15 PM, Martijn Dekkers <[email protected]> 
> wrote:
>> Thank you Koji,
>>
>> I have tried once again, using your updated example. Unfortunately, things
>> still get stuck at the first Wait processors' wait queue.
>> I did notice that the format of the files your example generates is
>> different. I will try to clarify:
>>
>> - 8 files in total:
>>
>> -- file_123_type1.ext1
>> -- file_123_type1.ext2
>>
>> -- file_123_type2.ext1
>> -- file_123_type2.ext2
>>
>> -- file_123_type3.ext1
>> -- file_123_type3.ext2
>>
>> -- file_123_type4.ext1
>> -- file_123_type4.ext2
>>
>> For each set of 8 files, "file_123" increments, so the first set of 8 is
>> "file_123", and the next set is "file_124" and so on.
>>
>> When I look at your example, I notice that at the final step (LogAttribute
>> after the FetchFile set) the filenames are file_123_<incrementing
>> number>.ex(1|2)
>>
>> My UpdateAttribute before the Notify branch is configured as:
>> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
>> counterName - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$2')}
>>
>> The UpdateAttribute before the Wait branch is configured as:
>> groupID - ${fileName:replaceFirst('^file_(\d{3}_)(\w+)\.ext1$','$1')}
>>
>> The 4 Wait processors in the Wait branch are configured as:
>> All Wait processors:
>> Release Signal Identifier - ${groupID}
>>
>> For each individual Wait processor:
>> Signal Counter Name - type1
>> Signal Counter Name - type2
>> Signal Counter Name - type3
>> Signal Counter Name - type4
>>
>> I am a bit stumped. The best success we had was a configuration with a
>> RouteonAttribute sending each of type1|type2|type3|type4 to their own Wait
>> processor, and a similar config for the Notify branch, followed by a final
>> Wait/Notify pair that simply ensures we have the correct amount of sets.
>>
>> This configuration did exactly what we want, but unfortunately we had random
>> flowfiles stuck in the waitqueue for no apparent reason.
>>
>> Thanks,
>>
>> Martijn
>>
>>
>>
>> On 31 May 2018 at 05:23, Koji Kawamura <[email protected]> wrote:
>>>
>>> The order of arrival does not matter.
>>>
>>> Wait processor has 'Expiration Duration' configuration, defaults to 10
>>> min. Please adjust it according to your needs, the longest period to
>>> wait for a delayed file.
>>> If a FlowFile exceeds the duration, it will be sent to 'expired'
>>> relationship, and can be treated differently, e.g. write ERROR log
>>>
>>> > If we have a longer wait for a file, we'd like processing for the next
>>> > groupid to still be able to continue.
>>>
>>> In order to achieve that, use Wait Mode = 'Transfer to wait
>>> relationship', and the 'wait' relationship should be configured to use
>>> FirstInFirstOutPrioritizer.
>>> If FirstInFirstOutPrioritizer is not set, the same FlowFile will be
>>> processed again while it blocks other FlowFiles.
>>> With FirstInFirstOutPrioritizer, the processed FlowFile will be
>>> re-queued at the end of wait queue.
>>>
>>> I've updated my example to make it more realistic, by adding delay for
>>> certain set and type.
>>> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
>>>
>>> Thanks,
>>> Koji
>>>
>>> On Thu, May 31, 2018 at 10:56 AM, Martijn Dekkers
>>> <[email protected]> wrote:
>>> > Cool, that will make things a lot simpler. Does it matter that the ext2
>>> > files arrive in random order? Sometimes there can be a very long delay
>>> > in
>>> > some of them showing up, and we have some concerns about the overall
>>> > flow
>>> > blocking. If we have a longer wait for a file, we'd like processing for
>>> > the
>>> > next groupid to still be able to continue.
>>> >
>>> > Thank you for your help (and for writing Wait/Notify!!)
>>> >
>>> > Martijn
>>> >
>>> > On 31 May 2018 at 03:49, Koji Kawamura <[email protected]> wrote:
>>> >>
>>> >> Glad to hear that was helpful.
>>> >>
>>> >> "4 same type for each extension", can be treated as "8 distinct types"
>>> >> if an extension is included in a type.
>>> >> ab.ex1, cd.ex1, ef.ex1, gh.ex1, ab.ex2, cd.ex2, ef.ex2, gh.ex2
>>> >>
>>> >> Then route only 'ab.ex1' (or whichever but just 1 of them) to the Wait
>>> >> branch, and the rest to Notify branch.
>>> >> That will simplify the flow, if I'm not missing any other requirement.
>>> >>
>>> >> Thanks!
>>> >> Koji
>>> >>
>>> >> On Thu, May 31, 2018 at 10:30 AM, Martijn Dekkers
>>> >> <[email protected]> wrote:
>>> >> > Hi Koji, Many thanks for your continued assistance!
>>> >> >
>>> >> >>
>>> >> >> - 1 file per second is relatively low in terms of traffic, it should
>>> >> >> be processed fine with 1 thread
>>> >> >> - A flow like this, which is stateful across different parts of the
>>> >> >> flow works at best with single thread, because using multiple
>>> >> >> threads
>>> >> >> would cause race condition or concurrency issue if there's any
>>> >> >> implementation error
>>> >> >
>>> >> >
>>> >> > Yes, we had similar thoughts.
>>> >> >
>>> >> >>
>>> >> >> - Based on above, I strongly recommend to NOT increase "concurrent
>>> >> >> tasks". If you see FlowFiles staying in a wait queue, then there
>>> >> >> must
>>> >> >> be different issue
>>> >> >
>>> >> >
>>> >> > We don't see many flowfiles stuck in a wait queue, I ran a test over
>>> >> > a
>>> >> > few
>>> >> > hours yesterday that simulates the way in which these files would
>>> >> > appear
>>> >> > (we
>>> >> > would have 4 of "ext1" show up every second, and the "ext2" can show
>>> >> > up
>>> >> > a
>>> >> > few seconds later, and not always in the same order) and we found
>>> >> > perhaps 6
>>> >> > flowfiles stuck in a wait queue.
>>> >> >
>>> >> >>
>>> >> >> - Also, using concurrent tasks number like 400 is too much in
>>> >> >> general
>>> >> >> for all processors. I recommend to increment it as 2, 3, 4 .. up to
>>> >> >> 8
>>> >> >> or so, only if you see the clear benefit by doing so
>>> >> >
>>> >> >
>>> >> > Indeed, thanks for the suggestion. Once we have the logic finished
>>> >> > and
>>> >> > tested we will have to optimise this Flow. The next step is to try to
>>> >> > load
>>> >> > the required processors into MiNiFy, as this will be running on many
>>> >> > systems
>>> >> > with limited capacity. If we don't manage with MiNiFy, we will still
>>> >> > be
>>> >> > good, but we prefer to have the smaller footprint and ease of
>>> >> > management
>>> >> > we
>>> >> > can obtain with MiNiFy.
>>> >> >
>>> >> >>
>>> >> >> - The important part of this flow is extracting 'groupId' and 'type'
>>> >> >> from file names. Regular Expression needs to be configured properly.
>>> >> >> - I recommend using https://regex101.com/ to test your Regular
>>> >> >> Expression to see whether it can extract correct groupId and type
>>> >> >
>>> >> >
>>> >> > Yes, we have tested our RegExes for this extensively
>>> >> >
>>> >> >>
>>> >> >>
>>> >> >> Lastly, regardless of how many files should be there for 'ext1' and
>>> >> >> 'ext2', the flow structure is simple as below.
>>> >> >> Let's say there should be 8 files to start processing those.
>>> >> >> 4 x ex1, and 4 ex2 in your case, but let's think it as 8 file types.
>>> >> >> And I assume the types are known, meaning, static, not dynamically
>>> >> >> change.
>>> >> >
>>> >> >
>>> >> > Correct, the format is <groupID><type>.<ext> where:
>>> >> >
>>> >> > - groupId is unique for each set of 8
>>> >> > - type has 4 variants (ab, cd, ef, gh), the same type-set for each
>>> >> > ext
>>> >> >
>>> >> >> So, the rule is, "a set of files consists of 8 files, and a set
>>> >> >> should
>>> >> >> wait to be processed until all 8 files are ready", that's all.
>>> >> >
>>> >> >
>>> >> > For our use case it is important that we have positive identification
>>> >> > that
>>> >> > we have exact "positive identification" of each file.
>>> >> >
>>> >> >>
>>> >> >> Then, the flow should be designed like below:
>>> >> >> 1. List files, each file will be sent as a FlowFile
>>> >> >
>>> >> >
>>> >> > Correct - we have several different listfiles for other sections of
>>> >> > the
>>> >> > flow, we are actually collecting many different sets, all variants of
>>> >> > the
>>> >> > above. However, those are far simpler (sets of 2 - ext1 and ext2
>>> >> > only)
>>> >> >
>>> >> >>
>>> >> >> 2. Extract groupId and type from filename
>>> >> >
>>> >> >
>>> >> > Correct
>>> >> >
>>> >> >>
>>> >> >> 3. Route FlowFiles into two branches, let's call these 'Notify'
>>> >> >> branch
>>> >> >> and 'Wait' branch, and pass only 1 type for a set to Wait-branch,
>>> >> >> and
>>> >> >> the rest 7 types to Notify-branch
>>> >> >
>>> >> >
>>> >> > OK, we currently split Notify branch to "all ext1" and wait branch to
>>> >> > "all
>>> >> > ext2"
>>> >> >
>>> >> >>
>>> >> >> At Notify branch (for the rest 7 types FlowFile, e.g. type 2, 3, 4
>>> >> >> ...
>>> >> >> 8)
>>> >> >
>>> >> >
>>> >> > As mentioned, we only have 4 distinct types.
>>> >> >
>>> >> >>
>>> >> >> 1. Notify that the type for a group has arrived.
>>> >> >> 2. Discard the FlowFile, because there's nothing to do with it in
>>> >> >> this
>>> >> >> branch
>>> >> >
>>> >> >
>>> >> >
>>> >> >>
>>> >> >> At Wait branch (for the type 1 FlowFile)
>>> >> >> 1. Wait for type 2 for the groupId.
>>> >> >> 2. Wait for type 3 for the groupId, type 4, 5 and so on
>>> >> >> 3. After passing Wait for type 8, it can guarantee that all 8 files
>>> >> >> are available (unless there is any other program deleting those)
>>> >> >> 4. Get actual file content using FetchFile, and process it
>>> >> >
>>> >> >
>>> >> > Besides the "4 same types for each extension", this is configured as
>>> >> > you
>>> >> > describe.
>>> >> >
>>> >> >>
>>> >> >> I hope it helps.
>>> >> >>
>>> >> >
>>> >> > It does, thanks. I will extract this portion of the flow, sanitise,
>>> >> > and
>>> >> > send
>>> >> > it along - easier to see than to describe :)
>>> >> >
>>> >> >
>>> >> >>
>>> >> >> Thanks,
>>> >> >> Koji
>>> >> >
>>> >> >
>>> >> > Thank you so much once again!
>>> >> >
>>> >> > Martijn
>>> >> >
>>> >> >
>>> >> >
>>> >> >>
>>> >> >>
>>> >> >> On Wed, May 30, 2018 at 6:10 PM, Martijn Dekkers
>>> >> >> <[email protected]>
>>> >> >> wrote:
>>> >> >> > Hey Pierre,
>>> >> >> >
>>> >> >> > Yes, we suspected as much, but we are only seeing this with the
>>> >> >> > Wait
>>> >> >> > processor. Possibly because that is the only "blocking" we have in
>>> >> >> > this
>>> >> >> > flow.
>>> >> >> >
>>> >> >> > Thanks for the clarification, much appreciated!
>>> >> >> >
>>> >> >> > Martijn
>>> >> >> >
>>> >> >> > On 30 May 2018 at 10:30, Pierre Villard
>>> >> >> > <[email protected]>
>>> >> >> > wrote:
>>> >> >> >>
>>> >> >> >> I'll let Koji give more information about the Wait/Notify, he is
>>> >> >> >> clearly
>>> >> >> >> the expert here.
>>> >> >> >>
>>> >> >> >> I'm just jumping in regarding your "and when viewing the queue,
>>> >> >> >> the
>>> >> >> >> dialog
>>> >> >> >> states that the queue is empty.". You're seeing this behavior
>>> >> >> >> because,
>>> >> >> >> even
>>> >> >> >> though the UI shows some flow files in the queue, the flow files
>>> >> >> >> are
>>> >> >> >> currently locked in the session of the running processor and you
>>> >> >> >> won't
>>> >> >> >> see
>>> >> >> >> flow files currently processed in a session when listing a queue.
>>> >> >> >> If
>>> >> >> >> you
>>> >> >> >> stop the processor, the session will be closed and you'll be able
>>> >> >> >> to
>>> >> >> >> list
>>> >> >> >> the queue and see the flow files.
>>> >> >> >>
>>> >> >> >> I recall discussions in the past to improve the UX for this. Not
>>> >> >> >> sure
>>> >> >> >> we
>>> >> >> >> have a JIRA for it though...
>>> >> >> >>
>>> >> >> >> Pierre
>>> >> >> >>
>>> >> >> >> 2018-05-30 8:26 GMT+02:00 Martijn Dekkers
>>> >> >> >> <[email protected]>:
>>> >> >> >>>
>>> >> >> >>> Hi Koji,
>>> >> >> >>>
>>> >> >> >>> Thank you for responding. I had adjusted the run schedule to
>>> >> >> >>> closely
>>> >> >> >>> mimic our environment. We are expecting about 1 file per second
>>> >> >> >>> or
>>> >> >> >>> so.
>>> >> >> >>> We are also seeing some random "orphans" sitting in a wait queue
>>> >> >> >>> every
>>> >> >> >>> now and again that don't trigger a debug message, and when
>>> >> >> >>> viewing
>>> >> >> >>> the
>>> >> >> >>> queue, the dialog states that the queue is empty.
>>> >> >> >>>
>>> >> >> >>> We found the random "no signal found" issue to be significantly
>>> >> >> >>> decreased
>>> >> >> >>> when we increase the "concurrent tasks" to something large -
>>> >> >> >>> currently
>>> >> >> >>> set
>>> >> >> >>> to 400 for all wait and notify processors.
>>> >> >> >>>
>>> >> >> >>> I do need to mention that our requirements had changed since you
>>> >> >> >>> made
>>> >> >> >>> the
>>> >> >> >>> template, in that we are looking for a set of 8 files - 4 x
>>> >> >> >>> "ext1"
>>> >> >> >>> and
>>> >> >> >>> 4 x
>>> >> >> >>> "ext2" both with the same pattern: <groupid><type (4 of
>>> >> >> >>> these)>.ext1
>>> >> >> >>> or ext2
>>> >> >> >>>
>>> >> >> >>> We found that the best way to make this work was to add another
>>> >> >> >>> wait/notify pair, each processor coming after the ones already
>>> >> >> >>> there,
>>> >> >> >>> with a
>>> >> >> >>> simple counter against the groupID.
>>> >> >> >>>
>>> >> >> >>> I will export a template for you, many thanks for your help - I
>>> >> >> >>> just
>>> >> >> >>> need
>>> >> >> >>> to spend some time sanitising the varies fields etc.
>>> >> >> >>>
>>> >> >> >>> Many thanks once again for your kind assistance.
>>> >> >> >>>
>>> >> >> >>> Martijn
>>> >> >> >>>
>>> >> >> >>> On 30 May 2018 at 08:14, Koji Kawamura <[email protected]>
>>> >> >> >>> wrote:
>>> >> >> >>>>
>>> >> >> >>>> Hi Martjin,
>>> >> >> >>>>
>>> >> >> >>>> In my template, I was using 'Run Schedule' as '5 secs' for the
>>> >> >> >>>> Wait
>>> >> >> >>>> processors to avoid overusing CPU resource. However, if you
>>> >> >> >>>> expect
>>> >> >> >>>> more throughput, it should be lowered.
>>> >> >> >>>> Changed Run Schedule to 0 sec, and I passed 100 group of files
>>> >> >> >>>> (400
>>> >> >> >>>> files because 4 files are 1 set in my example), they reached to
>>> >> >> >>>> the
>>> >> >> >>>> expected goal of the flow without issue.
>>> >> >> >>>>
>>> >> >> >>>> If you can share your flow and example input file volume
>>> >> >> >>>> (hundreds
>>> >> >> >>>> of
>>> >> >> >>>> files were fine in my flow), I may be able to provide more
>>> >> >> >>>> useful
>>> >> >> >>>> comment.
>>> >> >> >>>>
>>> >> >> >>>> Thanks,
>>> >> >> >>>> Koji
>>> >> >> >>>>
>>> >> >> >>>> On Wed, May 30, 2018 at 2:08 PM, Martijn Dekkers
>>> >> >> >>>> <[email protected]> wrote:
>>> >> >> >>>> > Hi Koji,
>>> >> >> >>>> >
>>> >> >> >>>> > I am seeing many issues to get this to run reliably. When
>>> >> >> >>>> > running
>>> >> >> >>>> > this
>>> >> >> >>>> > with
>>> >> >> >>>> > a few flowfiles at a time, and stepping through by switching
>>> >> >> >>>> > processors on
>>> >> >> >>>> > and off it works mostly fine, but running this at volume I
>>> >> >> >>>> > receive
>>> >> >> >>>> > many
>>> >> >> >>>> > errors about "no release signal found"
>>> >> >> >>>> >
>>> >> >> >>>> > I have tried to fix this in a few different ways, but the
>>> >> >> >>>> > issue
>>> >> >> >>>> > keeps
>>> >> >> >>>> > coming
>>> >> >> >>>> > back. This is also not consistent at all - different wait
>>> >> >> >>>> > processors
>>> >> >> >>>> > will
>>> >> >> >>>> > block different flowfiles at different times, without
>>> >> >> >>>> > changing
>>> >> >> >>>> > any
>>> >> >> >>>> > configuration. Stop/Start the flow, and different queues will
>>> >> >> >>>> > fill
>>> >> >> >>>> > up.
>>> >> >> >>>> > Do
>>> >> >> >>>> > you have any ideas what could be causing this behavior? I
>>> >> >> >>>> > checked
>>> >> >> >>>> > the
>>> >> >> >>>> > DistributedMapCache Server/Client components, and they all
>>> >> >> >>>> > appear
>>> >> >> >>>> > to
>>> >> >> >>>> > be
>>> >> >> >>>> > working OK.
>>> >> >> >>>> >
>>> >> >> >>>> > Thanks,
>>> >> >> >>>> >
>>> >> >> >>>> > Martijn
>>> >> >> >>>> >
>>> >> >> >>>> > On 28 May 2018 at 05:11, Koji Kawamura
>>> >> >> >>>> > <[email protected]>
>>> >> >> >>>> > wrote:
>>> >> >> >>>> >>
>>> >> >> >>>> >> Hi Martin,
>>> >> >> >>>> >>
>>> >> >> >>>> >> Alternative approach is using Wait/Notify processors.
>>> >> >> >>>> >> I have developed similar flow using those before, and it
>>> >> >> >>>> >> will
>>> >> >> >>>> >> work
>>> >> >> >>>> >> with your case I believe.
>>> >> >> >>>> >> A NiFi flow template is available here.
>>> >> >> >>>> >>
>>> >> >> >>>> >>
>>> >> >> >>>> >>
>>> >> >> >>>> >> https://gist.github.com/ijokarumawak/06b3b071eeb4d10d8a27507981422edd
>>> >> >> >>>> >>
>>> >> >> >>>> >> Hope this helps,
>>> >> >> >>>> >> Koji
>>> >> >> >>>> >>
>>> >> >> >>>> >>
>>> >> >> >>>> >> On Sun, May 27, 2018 at 11:48 PM, Andrew Grande
>>> >> >> >>>> >> <[email protected]>
>>> >> >> >>>> >> wrote:
>>> >> >> >>>> >> > Martijn,
>>> >> >> >>>> >> >
>>> >> >> >>>> >> > Here's an idea you could explore. Have the ListFile
>>> >> >> >>>> >> > processor
>>> >> >> >>>> >> > work
>>> >> >> >>>> >> > as
>>> >> >> >>>> >> > usual
>>> >> >> >>>> >> > and create a custom component (start with a scripting one
>>> >> >> >>>> >> > to
>>> >> >> >>>> >> > prototype)
>>> >> >> >>>> >> > grouping the filenames as needed. I don't know of the
>>> >> >> >>>> >> > number
>>> >> >> >>>> >> > of
>>> >> >> >>>> >> > files in
>>> >> >> >>>> >> > a
>>> >> >> >>>> >> > set is different every time, so trying to be more robust.
>>> >> >> >>>> >> >
>>> >> >> >>>> >> > Once you group and count the set, you can transfer the
>>> >> >> >>>> >> > names
>>> >> >> >>>> >> > to
>>> >> >> >>>> >> > the
>>> >> >> >>>> >> > success
>>> >> >> >>>> >> > relationship. Ignore otherwise and wait until the set is
>>> >> >> >>>> >> > full.
>>> >> >> >>>> >> >
>>> >> >> >>>> >> > Andrew
>>> >> >> >>>> >> >
>>> >> >> >>>> >> >
>>> >> >> >>>> >> > On Sun, May 27, 2018, 7:29 AM Martijn Dekkers
>>> >> >> >>>> >> > <[email protected]>
>>> >> >> >>>> >> > wrote:
>>> >> >> >>>> >> >>
>>> >> >> >>>> >> >> Hello all,
>>> >> >> >>>> >> >>
>>> >> >> >>>> >> >> I am trying to work out an issue with little success.
>>> >> >> >>>> >> >>
>>> >> >> >>>> >> >> I need to ingest files generated by some application. I
>>> >> >> >>>> >> >> can
>>> >> >> >>>> >> >> only
>>> >> >> >>>> >> >> ingest
>>> >> >> >>>> >> >> these files when a specific set exists. For example:
>>> >> >> >>>> >> >>
>>> >> >> >>>> >> >> file_123_456_ab.ex1
>>> >> >> >>>> >> >> file_123_456_cd.ex1
>>> >> >> >>>> >> >> file_123_456_ef.ex1
>>> >> >> >>>> >> >> file_123_456_gh.ex1
>>> >> >> >>>> >> >> file_123_456.ex2
>>> >> >> >>>> >> >>
>>> >> >> >>>> >> >> Only when a set like that exists should I pick them up
>>> >> >> >>>> >> >> into
>>> >> >> >>>> >> >> the
>>> >> >> >>>> >> >> Flow.
>>> >> >> >>>> >> >> The
>>> >> >> >>>> >> >> parts I am looking for to "group" would "ab.ex1",
>>> >> >> >>>> >> >> "cd.ex1",
>>> >> >> >>>> >> >> "ef.ex1",
>>> >> >> >>>> >> >> "gh.ex1", ".ex2".
>>> >> >> >>>> >> >>
>>> >> >> >>>> >> >> I tried to do this with some expression, but couldn't
>>> >> >> >>>> >> >> work
>>> >> >> >>>> >> >> it
>>> >> >> >>>> >> >> out.
>>> >> >> >>>> >> >>
>>> >> >> >>>> >> >> What would be the best way to achieve this?
>>> >> >> >>>> >> >>
>>> >> >> >>>> >> >> Many thanks!
>>> >> >> >>>> >
>>> >> >> >>>> >
>>> >> >> >>>
>>> >> >> >>>
>>> >> >> >>
>>> >> >> >
>>> >> >
>>> >> >
>>> >
>>> >
>>
>>

Reply via email to