What Andre described is what I had in mind as well... One thing to keep in mind is that I think you can only guarantee the ordering if all the files you want to process are picked up in one execution of GetFile.
For example, imagine there are 100 files in the directory, and GetFile's Batch Size is set to 10 (the default). The first time GetFile executes it is going to get 10 out of the 100 flow files, and then using Andre's example with the epoch as the priority, you can get those 10 flow files processed in order. If you were trying to get total order across all 100 files, you would either need the batch size to be greater than the total number of files, or you would need some kind of custom processor that waited for N flow files, and then if the queue before that processor used the PriorityAttributePrioritizer, then you would be waiting until all 100 flow files were in the queue in priority order before letting any of them process. On Fri, Mar 3, 2017 at 2:59 AM, Andre <[email protected]> wrote: > Prabhu, > > I suspect you need to rethink your use of concurrency on your workflow. I > give you an example: > > You spoke about 10 concurrent GetFile threads reading a repository and their > consequent ordering: > > Suppose you have 2 threads consuming: > > file1 - 10 MB > file2 - 20 MB > file3 - 50 MB > file4 - 10 MB > file5 - 10 MB > file6 - 10 MB > > All things equal, consider each of the 2 threads consume and dispatch the > files at the same speed. How can you guarantee that thread 1 will consume > file5 (i.e. as in t1-f1, t2-f2, t1-f3, t2-f4, t1-f5, t2-f6)? > > Or as Brandon DeVries clearly put a lojng while ago[1]: > > "Just because a FlowFile begins processing first doesn't mean it will > complete first (assuming the processor has multiple concurrent tasks)" > > Brandon goes further and provides some suggestions that may help you binning > your flowfiles and records together, but in any case... > > > Assuming the filename is named based on a date (e.g. > file_2017-03-03T010101.csv), have you considered using UpdateAttributes to > parse the filename into a date, that date into Epoch (which happens to be an > increasing number) as a first level index / prioritizer? > > This way you could have: > > GetFile (single thread) -- Connector with FIFO --> UpdateAttribute (adding > Epoch from filename date) -- Connector with PriorityAttributePrioritizer --> > rest of your flow > > > Once again, assuming the file name is file_2017-03-03T010101.csv, the > expression language would be something like: > > ${filename:toDate("'file_'yyyy-MM-dd'T'HHmmss'.csv'", "UTC"):toNumber()} > > > Would that help? > > > [1] > https://lists.apache.org/thread.html/203ddc0423ac7f877817ad5e2b389f079c2a27d8d4b4ef998ad91a32@1449844053@%3Cdev.nifi.apache.org%3E > > > On 3 Mar 2017 5:27 PM, "prabhu Mahendran" <[email protected]> wrote: >> >> This task(NIFI-470) suits to some of the workflow. If I set concurrent >> task to 10, records runs in parallel so that each file gets shuffled as I >> can see in the List Queue. >> >> >> >> If we get order of files from the Getfile, How I can ensure the data from >> each file is properly moved to destination(consider SQL) in same order with >> respect to concurrent task also? >> >> >> >> I need flow like this: Consider file1 has 10 records and it should be >> priortized from the value 1 to 10, then next file2 records should start with >> the priority value 11 to so on.. Filename can be in the order of the date >> from the getfile processor. Here I can ensure each ordered files are moved >> in the same order into SQL. >> >> >> >> Will this be achieved in the ticket or any suggestion for this? >> >> >> On Fri, Mar 3, 2017 at 11:37 AM, Andre <[email protected]> wrote: >>> >>> Hi, >>> >>> There's an existing JIRA ticket(NIFI-470) requesting a way to allow a DFM >>> to fine tune how GetFile build it's queues and control how to prioritise the >>> consumption of files. >>> >>> Would that be what you are looking after? >>> >>> Cheers >>> >>> >>> On 3 Mar 2017 15:55, "prabhu Mahendran" <[email protected]> wrote: >>> >>> Yes, exactly you got my point. >>> >>> >>> >>> Consider the filename contains date, how to prioritze the files from the >>> directory to come first based on the date(oldest date comes first to the >>> latest date comes last)? >>> >>> >>> >>> Issue faced here: Consider I have 2 files in the directory, after the >>> GetFile->SplitText->ExtractText, I used priority attribute in >>> UpdateAttribute. Now each file is initalized with priority 1...10. For >>> file1, each records has 1 to 10 priority value, similarly for file2, each >>> records has 1 to 10 priority value. Actually I want input files to be >>> prioritized based on date in the filename? So that finally, oldest date >>> records will be processed first and then the latest date records. >>> >>> >>> >>> >>> On Thu, Mar 2, 2017 at 6:39 PM, Bryan Bende <[email protected]> wrote: >>>> >>>> So in your example you are saying that 10 files get placed in a >>>> directory, and inside each of those 10 files the data is already >>>> ordered the way you want, but you want to ensure the 10 files get >>>> processed in a specific order? >>>> >>>> If that is true, what determines the order of the 10 files? is it >>>> based on the order they were written to the directory? or is there >>>> something in the filename that indicates which file comes first? In >>>> order for NiFi to prioritize these files, there has to be something >>>> that tells NiFi what the priority is. >>>> >>>> On Wed, Mar 1, 2017 at 11:56 PM, prabhu Mahendran >>>> <[email protected]> wrote: >>>> > As you suggested, setting 3 UpdateAttribute may be tedious. Suppose I >>>> > have >>>> > more than 10 flowfiles setting 10 updateattribute processor is lengthy >>>> > one. >>>> > This case also not possible for dynamically generating flowfiles. >>>> > >>>> > >>>> > >>>> > How to set priority attribute for the flowfiles from Getfile? Suppose >>>> > I get >>>> > 10 files in the Getfile processor, based on my priority I have ordered >>>> > the >>>> > flowfile each line in the files till PutSQL. Here without considering >>>> > the >>>> > order, based on the filecreation time, data is moved without my >>>> > ordered >>>> > records. For this case only I decided with the >>>> > PriorityAttributePrioritizer >>>> > and used UpdateAttribute processor. >>>> > >>>> > >>>> > >>>> > I can able to set the priority attribute for each line in the file, >>>> > but not >>>> > each files from GetFile. Can you suggest any solution? >>>> > >>>> > >>>> > >>>> > >>>> > On Wed, Mar 1, 2017 at 7:18 PM, Bryan Bende <[email protected]> wrote: >>>> >> >>>> >> I just responded to this question on stackoverflow: >>>> >> >>>> >> >>>> >> >>>> >> https://stackoverflow.com/questions/42528993/how-to-specify-priority-attributes-for-seperate-flowfiles >>>> >> >>>> >> Thanks, >>>> >> >>>> >> Bryan >>>> >> >>>> >> On Wed, Mar 1, 2017 at 5:19 AM, prabhu Mahendran >>>> >> <[email protected]> wrote: >>>> >> > I need to use PrioritizeAttributePrioritizer in NiFi. >>>> >> > >>>> >> > i have observed that prioritizers in below reference. >>>> >> > >>>> >> > https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#settings >>>> >> > >>>> >> > if i receive 10 flowfiles then i need to set the priority value for >>>> >> > every >>>> >> > flow file to be unique. >>>> >> > >>>> >> > After that specify queue configuration must be >>>> >> > PrioritizeAttributePrioritizer. >>>> >> > >>>> >> > Then processing flowfiles based on priority value. >>>> >> > >>>> >> > How can i set priority value for seperate flow files or which >>>> >> > prioritizer in >>>> >> > Nifi to be work for my case? >>>> > >>>> > >>> >>> >>> >> >
