Thank You Mark for your response. It will be really helpful when implementing the processor.
Thank You, Vibhath On Mon, 8 Mar 2021, 7:53 pm Mark Payne, <[email protected]> wrote: > I suspect that you would indeed want to create a custom processor to > handle this. I’m not sure what kind of performance requirements you might > have for this, but some things you’d need to think through: > > - To guarantee ordering you would have to avoid processing until all > FlowFiles have been queued up. Or if there’s some mechanism that you can > use to understand which flowfile will come next, at least until the “next" > FlowFile is available. It would be up to you to figure out the logic for > how to determine that all data that is needed is available. > > - This would need to be single-threaded, to ensure that the FlowFiles are > processed in-order. So you’d want to use the @TriggerSerially annotation. > > - Assuming that you don’t allow for some more sophisticated partitioning > logic, and that you do, in fact, require that each FlowFile be processed > after the other, it means that in a cluster, all data must be funneled to a > single node. You can do this by configuring the Connection into this > processor with a Load Balancing Strategy of “Single Node.” > > - You will need to save state, via the ProcessSession.setState() method. > You could potentially use a scope of LOCAL, but if you’re running a cluster > and decide to move processing from one node to another at some point, then > you’d lose the state, so you may want to use CLUSTER state. Perhaps the > only thing necessary to save in state is the y_val if the last row seen. > > - This approach, where you remember the last value from the previous > flowfile is very non-scalable, as the data must be processed sequentially. > If your data volumes are high at all, you’ll certainly want to spend some > time thinking about this approach to see if there’s a better way to attack > your problem. > > Hope this is helpful. > > Thanks > -Mark > > On Mar 7, 2021, at 1:09 AM, Vibhath Ileperuma <[email protected]> > wrote: > > Hi all, > > I have a set of CSV files which contains 'x_val', 'y_val' columns stored > in a S3 bucket. > Ex: > *file1.csv* > > *x_val, y_val* > *1,2* > *2,5* > *3,2* > > *file2.csv* > > *x_val, y_val* > *4,8* > *5,3* > *6,5* > > I need to, > > 1. List the csv files in alphabetical order. > 2. Add a new column 'prev_y_val' > 1. which should contain the value of the 'y_val' column of the > previous row. > 2. In the first row of a file, this column should contain the value > of the 'y_val' column in the last row of the previous file. (only the > first > row of the first file can be null). Even though the NIFI instance is > killed > while processing a file, it should be able to write the value of the > first > row correctly. > > Ex: > *file1.csv* > > *x_val, y_val,prev_y_val* > *1,2,* > *2,5,2* > *3,2,5* > > *file2.csv* > > *x_val, y_val,prev_y_val* > *4,8,2* > *5,3,8* > *6,5,3* > > > I'm grateful if you can suggest a way to implement this logic. If it is > required to create a custom processor, could you please suggest the best > practices for state management between two flow files. > > Thanks & Regards > > *Vibhath Ileperuma* > > > >
