Thank You Mark for your response. It will be really helpful when
implementing the processor.

Thank You,
Vibhath

On Mon, 8 Mar 2021, 7:53 pm Mark Payne, <[email protected]> wrote:

> I suspect that you would indeed want to create a custom processor to
> handle this. I’m not sure what kind of performance requirements you might
> have for this, but some things you’d need to think through:
>
> - To guarantee ordering you would have to avoid processing until all
> FlowFiles have been queued up. Or if there’s some mechanism that you can
> use to understand which flowfile will come next, at least until the “next"
> FlowFile is available. It would be up to you to figure out the logic for
> how to determine that all data that is needed is available.
>
> - This would need to be single-threaded, to ensure that the FlowFiles are
> processed in-order. So you’d want to use the @TriggerSerially annotation.
>
> - Assuming that you don’t allow for some more sophisticated partitioning
> logic, and that you do, in fact, require that each FlowFile be processed
> after the other, it means that in a cluster, all data must be funneled to a
> single node. You can do this by configuring the Connection into this
> processor with a Load Balancing Strategy of “Single Node.”
>
> - You will need to save state, via the ProcessSession.setState() method.
> You could potentially use a scope of LOCAL, but if you’re running a cluster
> and decide to move processing from one node to another at some point, then
> you’d lose the state, so you may want to use CLUSTER state. Perhaps the
> only thing necessary to save in state is the y_val if the last row seen.
>
> - This approach, where you remember the last value from the previous
> flowfile is very non-scalable, as the data must be processed sequentially.
> If your data volumes are high at all, you’ll certainly want to spend some
> time thinking about this approach to see if there’s a better way to attack
> your problem.
>
> Hope this is helpful.
>
> Thanks
> -Mark
>
> On Mar 7, 2021, at 1:09 AM, Vibhath Ileperuma <[email protected]>
> wrote:
>
> Hi all,
>
> I have a set of CSV files which contains 'x_val', 'y_val' columns stored
> in a S3 bucket.
> Ex:
> *file1.csv*
>
> *x_val, y_val*
> *1,2*
> *2,5*
> *3,2*
>
> *file2.csv*
>
> *x_val, y_val*
> *4,8*
> *5,3*
> *6,5*
>
> I need to,
>
>    1. List the csv files in alphabetical order.
>    2. Add a new column 'prev_y_val'
>       1. which should contain the value of the 'y_val' column of the
>       previous row.
>       2. In the first row of a file, this column should contain the value
>       of the 'y_val' column in the last row of the previous file. (only the 
> first
>       row of the first file can be null). Even though the NIFI instance is 
> killed
>       while processing a file, it should be able to write the value of the 
> first
>       row correctly.
>
> Ex:
> *file1.csv*
>
> *x_val, y_val,prev_y_val*
> *1,2,*
> *2,5,2*
> *3,2,5*
>
> *file2.csv*
>
> *x_val, y_val,prev_y_val*
> *4,8,2*
> *5,3,8*
> *6,5,3*
>
>
> I'm grateful if you can suggest a way to implement this logic. If it is
> required to create a custom processor, could you please suggest the best
> practices for state management between two flow files.
>
> Thanks & Regards
>
> *Vibhath Ileperuma*
>
>
>
>

Reply via email to