Dries,

The short answer is that, depending on your source, destination, and what 
you’re doing in between, it is sometimes (but not always) possible. But not 
particularly simple.

The longer version:

NiFi doesn’t strive to provide strict ordering guarantees. Rather, it strives 
to provide data prioritization. Consider a use case where NiFi is reading a 
temperature sensor on an oil rig with poor comms. Comms go out and 30 minutes 
later, they come back. If there’s a fire, we don’t want the 1,000 readings that 
have been taking during that 30 minutes - if there’s one that says there’s a 
fire, we want that one first. So this is achieved using FlowFile Prioritizers.

One such prioritizer is the FirstInFirstOutPrioritizer. Using this, data in a 
given queue is processed in the order that it arrived in the queue. So, for a 
strictly linear flow (i.e., a flow that goes from Processor A to B to C, 
without any routing/decision making) then this works as a strict ordering. But 
if a FlowFile is penalized, or if it is routed to a ‘failure’ relationship then 
you can have the data get out of order.

However, a common flow that we do see is to have Debezium (or something 
similar) monitoring a database for changes, and publish CDC events to Kafka. 
Then NiFi has a data flow that looks like ConsumeKafkaRecord_2_6 -> (possibly 
UpdateRecord/LookupRecord, etc. to perform Enrichment/filtering/updating) -> 
PublishDatabaseRecord (or PutKudu). In this flow, all connections use the FIFO 
Prioritizer. And it ensure that PublishDatabaseRecord / PutKudu is configured 
in a  way that it won’t route to failure - instead, if there’s a failure, it 
rolls back the session. Now, this handles the concern of ordering once the data 
is on the node, but the data must also arrive in the correct order from Kafka. 
So, for this case, you must also pin specific Kafka partitions to specific nifi 
nodes, which can be done by adding user-defined properties, as described in the 
documentation.

Thanks
-Mark

On Apr 1, 2021, at 8:10 AM, Boris Tyukin 
<[email protected]<mailto:[email protected]>> wrote:

We ended up building a simple groovy processor that will use mysql db to queue 
up flowfiles. If a flowfile A fails, flowfile B would sit in a queue until we 
address an issue with flowfile A. We also used back pressure feature to slow 
down upstream Kafka consumers.

After playing with wait/notify we found it extremely difficult and cumbersome. 
Enforce order was not really doing much for us as well. Our use case was to 
process kafka messages on 3 node nifi cluster in order.

It worked really well in the end for us

On Thu, Apr 1, 2021, 03:35 Van Autreve Dries 
<[email protected]<mailto:[email protected]>> wrote:
Hello all

We recently started using NiFi and we were wondering if strict order of 
processing flow files in a cluster could be guaranteed by NiFi.

One of the use cases is as following: messages arrive in a specific order, go 
through a simple flow with some basic transformations and are written to the 
destination (usually a relational database). The source of the messages can be 
a database, Kafka queue, …
It’s important that messages are written to the destination in exactly the same 
order they arrived at NiFi. The reason is that messages could be deltas and we 
do not want to overwrite newer data with older deltas. Moreover we do not 
always control the message format, hence controlling this from the messaging 
protocol point of view might not be possible.

We did some research in various places but have not found a satisfying answer. 
Our own investigations have revealed that:
- Just running the first processor on the primary node is not enough even with 
a load balancing strategy “single node”. While testing with stopping / starting 
the primary node we had some situations were messages got out of order.
- Using the EnforceOrder processor with high timeouts prevented the messages 
getting processed out of order, but each time the primary node changes, manual 
intervention is required to reconfigure the initial order property. Moreover it 
requires that the source system or first processor provides this incrementing 
sequence attribute.

It seems also not possible to pinpoint a flow to a specific node. At least we 
have not found this option. We do understand that this would affect scalability 
and availability or failover, but might be acceptable for those specific cases.

If there are other options we can explore, any input would be helpful.
Or if it’s not (easily) possible with NiFi on its own, it would be good to know!

--
Kind Regards
Dries Van Autreve


(Sorry if this will result in a double post. I was not yet subscribed when I 
did the first post and my message does not seem to appear in the list...)


Reply via email to