I suspect that you would indeed want to create a custom processor to handle 
this. I’m not sure what kind of performance requirements you might have for 
this, but some things you’d need to think through:

- To guarantee ordering you would have to avoid processing until all FlowFiles 
have been queued up. Or if there’s some mechanism that you can use to 
understand which flowfile will come next, at least until the “next" FlowFile is 
available. It would be up to you to figure out the logic for how to determine 
that all data that is needed is available.

- This would need to be single-threaded, to ensure that the FlowFiles are 
processed in-order. So you’d want to use the @TriggerSerially annotation.

- Assuming that you don’t allow for some more sophisticated partitioning logic, 
and that you do, in fact, require that each FlowFile be processed after the 
other, it means that in a cluster, all data must be funneled to a single node. 
You can do this by configuring the Connection into this processor with a Load 
Balancing Strategy of “Single Node.”

- You will need to save state, via the ProcessSession.setState() method. You 
could potentially use a scope of LOCAL, but if you’re running a cluster and 
decide to move processing from one node to another at some point, then you’d 
lose the state, so you may want to use CLUSTER state. Perhaps the only thing 
necessary to save in state is the y_val if the last row seen.

- This approach, where you remember the last value from the previous flowfile 
is very non-scalable, as the data must be processed sequentially. If your data 
volumes are high at all, you’ll certainly want to spend some time thinking 
about this approach to see if there’s a better way to attack your problem.

Hope this is helpful.

Thanks
-Mark

On Mar 7, 2021, at 1:09 AM, Vibhath Ileperuma 
<[email protected]<mailto:[email protected]>> wrote:

Hi all,

I have a set of CSV files which contains 'x_val', 'y_val' columns stored in a 
S3 bucket.
Ex:
file1.csv
x_val, y_val
1,2
2,5
3,2
file2.csv
x_val, y_val
4,8
5,3
6,5

I need to,

  1.  List the csv files in alphabetical order.
  2.  Add a new column 'prev_y_val'
     *   which should contain the value of the 'y_val' column of the previous 
row.
     *   In the first row of a file, this column should contain the value of 
the 'y_val' column in the last row of the previous file. (only the first row of 
the first file can be null). Even though the NIFI instance is killed while 
processing a file, it should be able to write the value of the first row 
correctly.

Ex:
file1.csv
x_val, y_val,prev_y_val
1,2,
2,5,2
3,2,5
file2.csv
x_val, y_val,prev_y_val
4,8,2
5,3,8
6,5,3

I'm grateful if you can suggest a way to implement this logic. If it is 
required to create a custom processor, could you please suggest the best 
practices for state management between two flow files.

Thanks & Regards

Vibhath Ileperuma


Reply via email to