Hi 

I did a deeper analysis in the Data Provenance, NiFi logs and Kafka logs. I 
will explain again my NiFi flow and observed behaviour: 

NiFi version 1.18.0 deployed on Windows 7 (one single node). 
Kafa version 2.7.0 (3 brokers). 

NiFi flow: 



All queues with FirstInFirstOutPrioritizer and each processor with the 
following configuration: 

- ConsumeAMQP: 


- PublishKafka_2_6 : 

- PublishKafka_2_6_normal : 


Both PublishKafka processors are sending data to the same Kafka cluster (3 
nodes) to two different topics, each with one partition and replication factor 
2. Outgoing FlowFiles from ConsumeAMQP processor contain one single json per 
FlowFile and 65 bytes size. With this configuration, from time to time, we 
recieve duplicated messages in TOPIC_1. 

That being said, I analyze one batch of duplicated messages we recieved between 
11/28/2022 07:15:30 GMT and 11/28/2022 07:15:32 GMT. Here are the results: 

1. Kafka logs show no rebalancing or repartition of the topic in that period of 
time. 
2. NiFi logs show no error, warning or any indicator that something gone wrong 
in that period of time. 
3. Data provenance shows that those messages were not duplicated in AMQP broker 
nor were they duplicated in any point of NiFi flow, BUT duplicate messages seen 
in Kafka have a Lineage Duration of nearly 40 seconds ( attatched is the Data 
Provenance analysis ). 

My conclusion is that some messages are not recieving the expected ACK from 
Kafka so NiFi resend them. But this raises some questions for me: 

1. First of all, if Acknowledgmente Wait Time is set to 5 secs, it does not 
have sense that messages have that large Lineage Duration (40 secs). What is 
going on during this time? 
2. If an ACK is not received from Kafka, shouldn't these flowfiles be routed to 
failure and, consequently, write an error in logs? Is Rollback failure strategy 
hiding this ACK timeout error? 
3. If a message is sent twice by the same producer to Kafka, shouldn't 
idempotence property handle it so no duplicated message is written to Kafka 
topic? 

Some last considerations: 

- We are using TOPIC_2 just as a backup and we are not aware if there are any 
duplicated messages. 
- From what I have understood after reading the documentation, setting 
Transactions to true and Delivery Guarantee to Guarantee Replicated Delivery 
implies the use of a Idempotence producer, and since 
max.in.flight.requests.per.connection default value is 5, is there any 
difference between these two processors? 


Thank you in advance, 

Aian 



De: "Pierre Villard" <[email protected]> 
Para: "users" <[email protected]> 
CC: "Aian Cantabrana" <[email protected]>, "Joe Witt" <[email protected]> 
Enviados: Sábado, 19 de Noviembre 2022 20:21:45 
Asunto: Re: Exacly once from NiFi to Kafka 

You don't need to run the Kafka processor only on the primary node. The Kafka 
client will take care of doing the proper partition-consumer assignment across 
the nodes and threads in the NiFi cluster. 

We're aware of many NiFi users consuming data from Kafka at very large scale 
without any issue duplicate issue. 

As mentioned before, I'd recommend looking at the provenance data for all 
generated flow file to really understand where there may be duplication. I'd 
also look at the Kafka logs and search for rebalancing / reassignment for 
consumers / partitions. 

Pierre 

Le sam. 19 nov. 2022, 19:33, Joe Obernberger < [ 
mailto:[email protected] | [email protected] ] > a écrit 
: 





Are you by chance using a clustered NiFi? I'm seeing duplicate messages if I 
run the consumer on multiple NiFi nodes, so I've started running the consumer 
only on the parent. This seems to correct the issue, but leads to other 
problems. I'd love a solution. 

-Joe 
On 11/16/2022 3:50 AM, Aian Cantabrana wrote: 

BQ_BEGIN

Hi Joe, 

Thanks for the reply. The actual flow is sending data from the ConsumeAMQP 
processor to two different PublishKafka processors, one with Idempotence and 
other with default config. Each of it is sending same data to two different 
topics and comparing both topics is how I am checking that there are 
duplicates. It seems to be random, some times they appear in the "normal" 
processor's topic and others in the "idempotence", I did not find any pattern. 

I will upgrade to NiFi 1.18.0 and try again. 

In any case, messages have json format (one json per flowfile) but since I am 
sending and storing them in kafka in plain text I am using no-record-oriented 
Kafka publisher. Is PublishKafkaRecord more reliable? Would it be better to use 
it? 

Thanks, 

Aian 


De: "Joe Witt" [ mailto:[email protected] | <[email protected]> ] 
Para: "users" [ mailto:[email protected] | <[email protected]> ] 
Enviados: Martes, 15 de Noviembre 2022 17:31:54 
Asunto: Re: Exacly once from NiFi to Kafka 

Aian, 
How can you tell there are duplicates in Kafka and are you certain that no 
duplicates exist in the source topic? 

Given NiFi's data provenance capabilities you should be able to pin point a 
given duplicate and figure out whether it happened at the source, in nifi, or 
otherwise. 

Note much has changed/improved since the 1.12.x line of NiFi so we have more 
Kafka components and record oriented mechanisms. But still pretty sure even in 
your version we should not be duplicating data unless the flow is configured 
such that it would happen. 

Thanks 

On Tue, Nov 15, 2022 at 9:25 AM Aian Cantabrana < [ mailto:[email protected] 
| [email protected] ] > wrote: 

BQ_BEGIN

Hi, 

I am having some difficulties trying to get exactly-once semantic while 
ensuring data order from NiFi to Kafka. I have read Kafka documentation and it 
should be quite straight forward using idempotent producer from NiFi and having 
a Kafka topic with a single partition, but I am still getting some duplicated 
messages in Kafka. 

NiFi version: 1.12.1 
Kafka version: 2.7.0 

NiFi flow: 
(Both shown queues with FIFO prioritizer) 

PublishKafka_2_6 configuration: 

As I said, target Kafka topic has just one partition to ensure data order. 

Incoming flowfiles are small 60 bytes messages. 

I have been a while working with it so any suggestion is really welcome. 

Thanks in advance, 

Aian 





BQ_END


[ 
http://www.avg.com/email-signature?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient
 ] 
        Virus-free. [ 
http://www.avg.com/email-signature?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient
 | www.avg.com ] 
[ 
https://mail.zylk.net/#m_5138987603471460647_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2
 |   ] 

BQ_END


Attachment: NiFi-Data-Provenance-duplicated-messages.ods
Description: application/vnd.oasis.opendocument.spreadsheet

Reply via email to