Dries, The short answer is that, depending on your source, destination, and what you’re doing in between, it is sometimes (but not always) possible. But not particularly simple.
The longer version: NiFi doesn’t strive to provide strict ordering guarantees. Rather, it strives to provide data prioritization. Consider a use case where NiFi is reading a temperature sensor on an oil rig with poor comms. Comms go out and 30 minutes later, they come back. If there’s a fire, we don’t want the 1,000 readings that have been taking during that 30 minutes - if there’s one that says there’s a fire, we want that one first. So this is achieved using FlowFile Prioritizers. One such prioritizer is the FirstInFirstOutPrioritizer. Using this, data in a given queue is processed in the order that it arrived in the queue. So, for a strictly linear flow (i.e., a flow that goes from Processor A to B to C, without any routing/decision making) then this works as a strict ordering. But if a FlowFile is penalized, or if it is routed to a ‘failure’ relationship then you can have the data get out of order. However, a common flow that we do see is to have Debezium (or something similar) monitoring a database for changes, and publish CDC events to Kafka. Then NiFi has a data flow that looks like ConsumeKafkaRecord_2_6 -> (possibly UpdateRecord/LookupRecord, etc. to perform Enrichment/filtering/updating) -> PublishDatabaseRecord (or PutKudu). In this flow, all connections use the FIFO Prioritizer. And it ensure that PublishDatabaseRecord / PutKudu is configured in a way that it won’t route to failure - instead, if there’s a failure, it rolls back the session. Now, this handles the concern of ordering once the data is on the node, but the data must also arrive in the correct order from Kafka. So, for this case, you must also pin specific Kafka partitions to specific nifi nodes, which can be done by adding user-defined properties, as described in the documentation. Thanks -Mark On Apr 1, 2021, at 8:10 AM, Boris Tyukin <[email protected]<mailto:[email protected]>> wrote: We ended up building a simple groovy processor that will use mysql db to queue up flowfiles. If a flowfile A fails, flowfile B would sit in a queue until we address an issue with flowfile A. We also used back pressure feature to slow down upstream Kafka consumers. After playing with wait/notify we found it extremely difficult and cumbersome. Enforce order was not really doing much for us as well. Our use case was to process kafka messages on 3 node nifi cluster in order. It worked really well in the end for us On Thu, Apr 1, 2021, 03:35 Van Autreve Dries <[email protected]<mailto:[email protected]>> wrote: Hello all We recently started using NiFi and we were wondering if strict order of processing flow files in a cluster could be guaranteed by NiFi. One of the use cases is as following: messages arrive in a specific order, go through a simple flow with some basic transformations and are written to the destination (usually a relational database). The source of the messages can be a database, Kafka queue, … It’s important that messages are written to the destination in exactly the same order they arrived at NiFi. The reason is that messages could be deltas and we do not want to overwrite newer data with older deltas. Moreover we do not always control the message format, hence controlling this from the messaging protocol point of view might not be possible. We did some research in various places but have not found a satisfying answer. Our own investigations have revealed that: - Just running the first processor on the primary node is not enough even with a load balancing strategy “single node”. While testing with stopping / starting the primary node we had some situations were messages got out of order. - Using the EnforceOrder processor with high timeouts prevented the messages getting processed out of order, but each time the primary node changes, manual intervention is required to reconfigure the initial order property. Moreover it requires that the source system or first processor provides this incrementing sequence attribute. It seems also not possible to pinpoint a flow to a specific node. At least we have not found this option. We do understand that this would affect scalability and availability or failover, but might be acceptable for those specific cases. If there are other options we can explore, any input would be helpful. Or if it’s not (easily) possible with NiFi on its own, it would be good to know! -- Kind Regards Dries Van Autreve (Sorry if this will result in a double post. I was not yet subscribed when I did the first post and my message does not seem to appear in the list...)
