Bryan & Andre, PriorityAttributePrioritizer ,FIFO strategy worked if flow has no loop processing.
But i have configured loop processing in my workflow by using this reference. https://gist.github.com/ijokarumawak/01c4fd2d9291d3e74ec424a581659ca8#file-loop_sample-xml For example if i process two files using getFile processor then content of those flow files get shuffled after looping of file. That's why i can't prioritize my flow files. Now i need to combine flow files if it having same file name. *use case: *consider two files named file1 and file2. i have get those two files and processing it after some changes in contents then perform loop flow to iterate same modifications for all files.Hereafter queue having some shuffled flowfiles. i try to combine flow files based on name of file.Check filename and then combine all flowfiles having same name into one flowfile. Can you suggest any way to perform my requirement? Thanks in Advance, On Fri, Mar 3, 2017 at 8:08 PM, Bryan Bende <[email protected]> wrote: > What Andre described is what I had in mind as well... > > One thing to keep in mind is that I think you can only guarantee the > ordering if all the files you want to process are picked up in one > execution of GetFile. > > For example, imagine there are 100 files in the directory, and > GetFile's Batch Size is set to 10 (the default). The first time > GetFile executes it is going to get 10 out of the 100 flow files, and > then using Andre's example with the epoch as the priority, you can get > those 10 flow files processed in order. > > If you were trying to get total order across all 100 files, you would > either need the batch size to be greater than the total number of > files, or you would need some kind of custom processor that waited for > N flow files, and then if the queue before that processor used the > PriorityAttributePrioritizer, then you would be waiting until all 100 > flow files were in the queue in priority order before letting any of > them process. > > > > > On Fri, Mar 3, 2017 at 2:59 AM, Andre <[email protected]> wrote: > > Prabhu, > > > > I suspect you need to rethink your use of concurrency on your workflow. I > > give you an example: > > > > You spoke about 10 concurrent GetFile threads reading a repository and > their > > consequent ordering: > > > > Suppose you have 2 threads consuming: > > > > file1 - 10 MB > > file2 - 20 MB > > file3 - 50 MB > > file4 - 10 MB > > file5 - 10 MB > > file6 - 10 MB > > > > All things equal, consider each of the 2 threads consume and dispatch the > > files at the same speed. How can you guarantee that thread 1 will consume > > file5 (i.e. as in t1-f1, t2-f2, t1-f3, t2-f4, t1-f5, t2-f6)? > > > > Or as Brandon DeVries clearly put a lojng while ago[1]: > > > > "Just because a FlowFile begins processing first doesn't mean it will > > complete first (assuming the processor has multiple concurrent tasks)" > > > > Brandon goes further and provides some suggestions that may help you > binning > > your flowfiles and records together, but in any case... > > > > > > Assuming the filename is named based on a date (e.g. > > file_2017-03-03T010101.csv), have you considered using UpdateAttributes > to > > parse the filename into a date, that date into Epoch (which happens to > be an > > increasing number) as a first level index / prioritizer? > > > > This way you could have: > > > > GetFile (single thread) -- Connector with FIFO --> UpdateAttribute > (adding > > Epoch from filename date) -- Connector with PriorityAttributePrioritizer > --> > > rest of your flow > > > > > > Once again, assuming the file name is file_2017-03-03T010101.csv, the > > expression language would be something like: > > > > ${filename:toDate("'file_'yyyy-MM-dd'T'HHmmss'.csv'", "UTC"):toNumber()} > > > > > > Would that help? > > > > > > [1] > > https://lists.apache.org/thread.html/203ddc0423ac7f877817ad5e2b389f > 079c2a27d8d4b4ef998ad91a32@1449844053@%3Cdev.nifi.apache.org%3E > > > > > > On 3 Mar 2017 5:27 PM, "prabhu Mahendran" <[email protected]> > wrote: > >> > >> This task(NIFI-470) suits to some of the workflow. If I set concurrent > >> task to 10, records runs in parallel so that each file gets shuffled as > I > >> can see in the List Queue. > >> > >> > >> > >> If we get order of files from the Getfile, How I can ensure the data > from > >> each file is properly moved to destination(consider SQL) in same order > with > >> respect to concurrent task also? > >> > >> > >> > >> I need flow like this: Consider file1 has 10 records and it should be > >> priortized from the value 1 to 10, then next file2 records should start > with > >> the priority value 11 to so on.. Filename can be in the order of the > date > >> from the getfile processor. Here I can ensure each ordered files are > moved > >> in the same order into SQL. > >> > >> > >> > >> Will this be achieved in the ticket or any suggestion for this? > >> > >> > >> On Fri, Mar 3, 2017 at 11:37 AM, Andre <[email protected]> wrote: > >>> > >>> Hi, > >>> > >>> There's an existing JIRA ticket(NIFI-470) requesting a way to allow a > DFM > >>> to fine tune how GetFile build it's queues and control how to > prioritise the > >>> consumption of files. > >>> > >>> Would that be what you are looking after? > >>> > >>> Cheers > >>> > >>> > >>> On 3 Mar 2017 15:55, "prabhu Mahendran" <[email protected]> > wrote: > >>> > >>> Yes, exactly you got my point. > >>> > >>> > >>> > >>> Consider the filename contains date, how to prioritze the files from > the > >>> directory to come first based on the date(oldest date comes first to > the > >>> latest date comes last)? > >>> > >>> > >>> > >>> Issue faced here: Consider I have 2 files in the directory, after the > >>> GetFile->SplitText->ExtractText, I used priority attribute in > >>> UpdateAttribute. Now each file is initalized with priority 1...10. For > >>> file1, each records has 1 to 10 priority value, similarly for file2, > each > >>> records has 1 to 10 priority value. Actually I want input files to be > >>> prioritized based on date in the filename? So that finally, oldest > date > >>> records will be processed first and then the latest date records. > >>> > >>> > >>> > >>> > >>> On Thu, Mar 2, 2017 at 6:39 PM, Bryan Bende <[email protected]> wrote: > >>>> > >>>> So in your example you are saying that 10 files get placed in a > >>>> directory, and inside each of those 10 files the data is already > >>>> ordered the way you want, but you want to ensure the 10 files get > >>>> processed in a specific order? > >>>> > >>>> If that is true, what determines the order of the 10 files? is it > >>>> based on the order they were written to the directory? or is there > >>>> something in the filename that indicates which file comes first? In > >>>> order for NiFi to prioritize these files, there has to be something > >>>> that tells NiFi what the priority is. > >>>> > >>>> On Wed, Mar 1, 2017 at 11:56 PM, prabhu Mahendran > >>>> <[email protected]> wrote: > >>>> > As you suggested, setting 3 UpdateAttribute may be tedious. Suppose > I > >>>> > have > >>>> > more than 10 flowfiles setting 10 updateattribute processor is > lengthy > >>>> > one. > >>>> > This case also not possible for dynamically generating flowfiles. > >>>> > > >>>> > > >>>> > > >>>> > How to set priority attribute for the flowfiles from Getfile? > Suppose > >>>> > I get > >>>> > 10 files in the Getfile processor, based on my priority I have > ordered > >>>> > the > >>>> > flowfile each line in the files till PutSQL. Here without > considering > >>>> > the > >>>> > order, based on the filecreation time, data is moved without my > >>>> > ordered > >>>> > records. For this case only I decided with the > >>>> > PriorityAttributePrioritizer > >>>> > and used UpdateAttribute processor. > >>>> > > >>>> > > >>>> > > >>>> > I can able to set the priority attribute for each line in the file, > >>>> > but not > >>>> > each files from GetFile. Can you suggest any solution? > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > On Wed, Mar 1, 2017 at 7:18 PM, Bryan Bende <[email protected]> > wrote: > >>>> >> > >>>> >> I just responded to this question on stackoverflow: > >>>> >> > >>>> >> > >>>> >> > >>>> >> https://stackoverflow.com/questions/42528993/how-to- > specify-priority-attributes-for-seperate-flowfiles > >>>> >> > >>>> >> Thanks, > >>>> >> > >>>> >> Bryan > >>>> >> > >>>> >> On Wed, Mar 1, 2017 at 5:19 AM, prabhu Mahendran > >>>> >> <[email protected]> wrote: > >>>> >> > I need to use PrioritizeAttributePrioritizer in NiFi. > >>>> >> > > >>>> >> > i have observed that prioritizers in below reference. > >>>> >> > > >>>> >> > https://nifi.apache.org/docs/nifi-docs/html/user-guide. > html#settings > >>>> >> > > >>>> >> > if i receive 10 flowfiles then i need to set the priority value > for > >>>> >> > every > >>>> >> > flow file to be unique. > >>>> >> > > >>>> >> > After that specify queue configuration must be > >>>> >> > PrioritizeAttributePrioritizer. > >>>> >> > > >>>> >> > Then processing flowfiles based on priority value. > >>>> >> > > >>>> >> > How can i set priority value for seperate flow files or which > >>>> >> > prioritizer in > >>>> >> > Nifi to be work for my case? > >>>> > > >>>> > > >>> > >>> > >>> > >> > > >
