Prabhu,

I suspect you need to rethink your use of concurrency on your workflow. I
give you an example:

You spoke about 10 concurrent GetFile threads reading a repository and
their consequent ordering:

Suppose you have 2 threads consuming:

file1 - 10 MB
file2 - 20 MB
file3 - 50 MB
file4 - 10 MB
file5 - 10 MB
file6 - 10 MB

All things equal, consider each of the 2 threads consume and dispatch the
files at the same speed. How can you guarantee that thread 1 will consume
file5 (i.e. as in t1-f1, t2-f2, t1-f3, t2-f4, t1-f5, t2-f6)?

Or as Brandon DeVries clearly put a lojng while ago[1]:

"Just because a FlowFile begins processing first doesn't mean it will
complete first (assuming the processor has multiple concurrent tasks)"

Brandon goes further and provides some suggestions that may help you
binning your flowfiles and records together, but in any case...


Assuming the filename is named based on a date (e.g.
file_2017-03-03T010101.csv), have you considered using UpdateAttributes to
parse the filename into a date, that date into Epoch (which happens to be
an increasing number) as a first level index / prioritizer?

This way you could have:

GetFile (single thread) -- Connector with FIFO --> UpdateAttribute (adding
Epoch from filename date) -- Connector with PriorityAttributePrioritizer
--> rest of your flow


Once again, assuming the file name is file_2017-03-03T010101.csv, the
expression language would be something like:

${filename:toDate("'file_'yyyy-MM-dd'T'HHmmss'.csv'", "UTC"):toNumber()}


Would that help?


[1] https://lists.apache.org/thread.html/203ddc0423ac7f877817ad5e2b389f
079c2a27d8d4b4ef998ad91a32@1449844053@%3Cdev.nifi.apache.org%3E


On 3 Mar 2017 5:27 PM, "prabhu Mahendran" <[email protected]> wrote:

> This task(NIFI-470) suits to some of the workflow. If I set concurrent
> task to 10, records runs in parallel so that each file gets shuffled as I
> can see in the List Queue.
>
>
>
> If we get order of files from the Getfile, How I can ensure the data from
> each file is properly moved to destination(consider SQL) in same order with
> respect to concurrent task also?
>
>
>
> I need flow like this: Consider file1 has 10 records and it should be
> priortized from the value 1 to 10, then next file2 records should start
> with the priority value 11 to so on.. Filename can be in the order of the
> date from the getfile processor. Here I can ensure each ordered files are
> moved in the same order into SQL.
>
>
>
> Will this be achieved in the ticket or any suggestion for this?
>
> On Fri, Mar 3, 2017 at 11:37 AM, Andre <[email protected]> wrote:
>
>> Hi,
>>
>> There's an existing JIRA ticket(NIFI-470) requesting a way to allow a DFM
>> to fine tune how GetFile build it's queues and control how to prioritise
>> the consumption of files.
>>
>> Would that be what you are looking after?
>>
>> Cheers
>>
>>
>> On 3 Mar 2017 15:55, "prabhu Mahendran" <[email protected]> wrote:
>>
>> Yes, exactly you got my point.
>>
>>
>>
>> Consider the filename contains date, how to prioritze the files from the
>> directory to come first based on the date(oldest date comes first to the
>> latest date comes last)?
>>
>>
>>
>> Issue faced here: Consider I have 2 files in the directory, after the
>> GetFile->SplitText->ExtractText, I used priority attribute in
>> UpdateAttribute. Now each file is initalized with priority 1...10. For
>> file1, each records has 1 to 10 priority value, similarly for file2, each
>> records has 1 to 10 priority value. Actually I want input files to be
>> prioritized based on date in the filename?  So that finally, oldest date
>> records will be processed first and then the latest date records.
>>
>>
>>
>> On Thu, Mar 2, 2017 at 6:39 PM, Bryan Bende <[email protected]> wrote:
>>
>>> So in your example you are saying that 10 files get placed in a
>>> directory, and inside each of those 10 files the data is already
>>> ordered the way you want, but you want to ensure the 10 files get
>>> processed in a specific order?
>>>
>>> If that is true, what determines the order of the 10 files? is it
>>> based on the order they were written to the directory? or is there
>>> something in the filename that indicates which file comes first? In
>>> order for NiFi to prioritize these files, there has to be something
>>> that tells NiFi what the priority is.
>>>
>>> On Wed, Mar 1, 2017 at 11:56 PM, prabhu Mahendran
>>> <[email protected]> wrote:
>>> > As you suggested, setting 3 UpdateAttribute may be tedious. Suppose I
>>> have
>>> > more than 10 flowfiles setting 10 updateattribute processor is lengthy
>>> one.
>>> > This case also not possible for dynamically generating flowfiles.
>>> >
>>> >
>>> >
>>> > How to set priority attribute for the flowfiles from Getfile? Suppose
>>> I get
>>> > 10 files in the Getfile processor, based on my priority I have ordered
>>> the
>>> > flowfile each line in the files till PutSQL. Here without considering
>>> the
>>> > order, based on the filecreation time, data is moved without my ordered
>>> > records. For this case only I decided with the
>>> PriorityAttributePrioritizer
>>> > and used UpdateAttribute processor.
>>> >
>>> >
>>> >
>>> > I can able to set the priority attribute for each line in the file,
>>> but not
>>> > each files from GetFile. Can you suggest any solution?
>>> >
>>> >
>>> >
>>> >
>>> > On Wed, Mar 1, 2017 at 7:18 PM, Bryan Bende <[email protected]> wrote:
>>> >>
>>> >> I just responded to this question on stackoverflow:
>>> >>
>>> >>
>>> >> https://stackoverflow.com/questions/42528993/how-to-specify-
>>> priority-attributes-for-seperate-flowfiles
>>> >>
>>> >> Thanks,
>>> >>
>>> >> Bryan
>>> >>
>>> >> On Wed, Mar 1, 2017 at 5:19 AM, prabhu Mahendran
>>> >> <[email protected]> wrote:
>>> >> > I need to use PrioritizeAttributePrioritizer in NiFi.
>>> >> >
>>> >> > i have observed that prioritizers in below reference.
>>> >> > https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#
>>> settings
>>> >> >
>>> >> > if i receive 10 flowfiles then i need to set the priority value for
>>> >> > every
>>> >> > flow file to be unique.
>>> >> >
>>> >> > After that specify queue configuration must be
>>> >> > PrioritizeAttributePrioritizer.
>>> >> >
>>> >> > Then processing flowfiles based on priority value.
>>> >> >
>>> >> > How can i set priority value for seperate flow files or which
>>> >> > prioritizer in
>>> >> > Nifi to be work for my case?
>>> >
>>> >
>>>
>>
>>
>>
>

Reply via email to