I suspect that you would indeed want to create a custom processor to handle this. I’m not sure what kind of performance requirements you might have for this, but some things you’d need to think through:
- To guarantee ordering you would have to avoid processing until all FlowFiles have been queued up. Or if there’s some mechanism that you can use to understand which flowfile will come next, at least until the “next" FlowFile is available. It would be up to you to figure out the logic for how to determine that all data that is needed is available. - This would need to be single-threaded, to ensure that the FlowFiles are processed in-order. So you’d want to use the @TriggerSerially annotation. - Assuming that you don’t allow for some more sophisticated partitioning logic, and that you do, in fact, require that each FlowFile be processed after the other, it means that in a cluster, all data must be funneled to a single node. You can do this by configuring the Connection into this processor with a Load Balancing Strategy of “Single Node.” - You will need to save state, via the ProcessSession.setState() method. You could potentially use a scope of LOCAL, but if you’re running a cluster and decide to move processing from one node to another at some point, then you’d lose the state, so you may want to use CLUSTER state. Perhaps the only thing necessary to save in state is the y_val if the last row seen. - This approach, where you remember the last value from the previous flowfile is very non-scalable, as the data must be processed sequentially. If your data volumes are high at all, you’ll certainly want to spend some time thinking about this approach to see if there’s a better way to attack your problem. Hope this is helpful. Thanks -Mark On Mar 7, 2021, at 1:09 AM, Vibhath Ileperuma <[email protected]<mailto:[email protected]>> wrote: Hi all, I have a set of CSV files which contains 'x_val', 'y_val' columns stored in a S3 bucket. Ex: file1.csv x_val, y_val 1,2 2,5 3,2 file2.csv x_val, y_val 4,8 5,3 6,5 I need to, 1. List the csv files in alphabetical order. 2. Add a new column 'prev_y_val' * which should contain the value of the 'y_val' column of the previous row. * In the first row of a file, this column should contain the value of the 'y_val' column in the last row of the previous file. (only the first row of the first file can be null). Even though the NIFI instance is killed while processing a file, it should be able to write the value of the first row correctly. Ex: file1.csv x_val, y_val,prev_y_val 1,2, 2,5,2 3,2,5 file2.csv x_val, y_val,prev_y_val 4,8,2 5,3,8 6,5,3 I'm grateful if you can suggest a way to implement this logic. If it is required to create a custom processor, could you please suggest the best practices for state management between two flow files. Thanks & Regards Vibhath Ileperuma
