ConsumeKafka is just taking the messages as raw bytes and writing the content to disk. ConsumeKafkaRecord is serializing/deserializing the data according to a schema and possibly converting the data into another format. That's normal to have a huge performance difference and CPU consumption.
Regarding the number of messages per flow file, IIRC, I always had a good amount of messages per flow file and a consistent number of messages across flow files (in the use case I mentioned earlier in this thread). It was a long time ago but I think I remember adding a dynamic property to set a Kafka property but can't remember which one. Trying to find out. Pierre Le lun. 22 juin 2020 à 17:52, <josef.zahn...@swisscom.com> a écrit : > Hi Mark, > > > > it really doesn’t matter what I configure for “Max Poll Records” (tried > 10, 1’000, 100’000) or for “Max Uncommitted Time” (tried 1s, 10s, 100s). > The flowfile size is always randomly between 1 and about 500 records. So > those two parameters doesn’t work at all in my case. The theory behind it > is clear, but it doesn’t work as expected… Of course the queue was more > than full. > > > > In the meantime I’ve done some tests with the KafkaConsume processor, and > the performance difference is again huge - 4 times better performance (1 > Million messages per sec) with the same topic and amount of threads for the > Non-Record processor -> network limit reached. Seems that the > RecordReader/RecordWriter part of the KafkaConsumeRecord processor consumes > a lot of CPU power. Interesting that nobody complained about it until now, > are we that high with a few 100’000 messages per second? We have sources > which produces about 200’000 messages/s and we would like to consume that > as well a few times faster than producing. > > > > We have now plans to implement a KafkaAvroConsumer, which is based on the > KafkaConsumer processor. It will consume from kafka and write avro out > instead of the plain message with demarcator. We hope to get the same great > performance as with the KafkaConsume processor. > > > > Cheers Josef > > > > *From: *Mark Payne <marka...@hotmail.com> > *Reply to: *"users@nifi.apache.org" <users@nifi.apache.org> > *Date: *Monday, 22 June 2020 at 15:03 > *To: *"users@nifi.apache.org" <users@nifi.apache.org> > *Subject: *Re: ConsumeKafkaRecord Performance Issue > > > > Josef, > > > > The Max Poll Records just gets handed to the Kafka client and limits the > size of how much should be pulled back in a single request. This can be > important because if you set the value really high, you could potentially > buffer up a lot of messages in memory and consume a lot of heap. But > setting it too low would result in small batches. So that property can play > a roll in the size of a batch of records, but you should not expect to see > batches output that are necessarily equal to the value there. > > > > What value do you have set for the “Max Uncommitted Time”? That can > certainly play a roll in the size of the FlowFiles that are output. > > > > Thanks > > -Mark > > > > > > On Jun 22, 2020, at 2:48 AM, josef.zahn...@swisscom.com wrote: > > > > Hi Mark, > > > > thanks a lot for your explanation, makes fully sense! Did you checked as > well the “Max Poll Records” parameter? Because no matter how high I’m > setting it I’m getting always a random number of records back into one > flowfile. The max. is about 400 records which isn’t ideal for small records > as NiFi gets a lot of Flowfiles with a few kilobytes in case of a huge > backlog. > > > > Cheers Josef > > > > > > *From: *Mark Payne <marka...@hotmail.com> > *Reply to: *"users@nifi.apache.org" <users@nifi.apache.org> > *Date: *Friday, 19 June 2020 at 17:06 > *To: *"users@nifi.apache.org" <users@nifi.apache.org> > *Subject: *Re: ConsumeKafkaRecord Performance Issue > > > > Josef, > > > > Glad you were able to get past this hurdle. The reason for the consumer > yielding is a bit complex. NiFi issues an async request to Kafka to > retrieve messages. Then, NiFi performs a long-poll to get those messages > from the Kafka client. If the client returns 0 messages from the long-poll, > the assumption that NiFi makes is that there are no more messages available > from Kafka. So it yields to avoid hammering the Kafka server constantly > when there are no messages available. Unfortunately, though, I have found > fairly recently by digging into the Kafka client code that returning 0 > messages happens not only when there are no messages available on the Kafka > server but also if the client just takes longer than that long-poll (10 > milliseconds) to receive the response and prepare the messages on the > client side. The client doesn’t appear to readily expose any information > about whether or not there are more messages available, so this seems to be > the best we can do with what the client currently provides. > > > > So setting a yield duration of 0 seconds will provide much higher > throughput but may put more load on the Kafka brokers. > > > > > On Jun 19, 2020, at 10:12 AM, josef.zahn...@swisscom.com wrote: > > > > Hi Mark, Pierre > > > > We are using NiFi 1.11.4, so fully up to date. > > > > Are you kidding me :-D, “Yield Duration” was always on the default value > (1 secs), as I didn’t expect that the processor “yields”. But due to your > comment I’ve changed it to “0 secs”. I can’t believe it, the performance > has been increased to the same value (about 250’000k messages per seconds) > as the kafka-consumer-perf-test.sh shows. Thanks a lot!! However 250k > messages is still not enough to cover all our use cases, but at least it is > now consistent to the kafka performance testing script. The Kafka Grafana > shows about 60MB/s outgoing with the current number of messages. > > > > @Pierre: The setup you are referring to with 10-20Mio messages per > seconds. How many partitions had they and how big were the messages? We are > storing the messages in this example as AVRO with about 44 fields. > > > > Cheers Josef > > > > > > PS: below some more information about my setup (even though our main issue > has been solved): > > > > As record reader I’m using a AvroReader which gets the schema from a > confluent schema registry. Every setting there is default, except the > connection parameters to confluent. As record writer I’m using > AvroRecordSetWriter with a predefined schema as we only want to have a > reduced column set. > > > > The 8 servers are using only SAS SSDs and doesn’t store the data. The data > goes from ConsumeKafkaRecord directly into our DB which runs on another > cluster. As I mentioned already, problem was there whether I’ was using > “Primary Only” or distribute it on the cluster. So it wasn’t a limit on a > single node. > > > > <image001.png> > > > > > > > > *From: *Mark Payne <marka...@hotmail.com> > *Reply to: *"users@nifi.apache.org" <users@nifi.apache.org> > *Date: *Friday, 19 June 2020 at 14:46 > *To: *"users@nifi.apache.org" <users@nifi.apache.org> > *Subject: *Re: ConsumeKafkaRecord Performance Issue > > > > Josef, > > > > Have you tried updating the processor’s Yield Duration (configure -> > settings tab)? Setting that to “0 secs” can make a big difference in > ConsumeKafka(Record)’s performance. > > > > Also, what kind of data rate (MB/sec) are you looking at, which record > reader and writer are you using? Are you using a schema registry? Spinning > disk or ssd? > > > > All of these can make a big difference in performance. > > > > Thanks > > Mark > > > > > > On Jun 19, 2020, at 3:45 AM, "josef.zahn...@swisscom.com" < > josef.zahn...@swisscom.com> wrote: > > Hi Chris > > > > Our brokers are using Kafka 2.3.0, just slightly different to my > kafka-consumer-perf-test.sh. > > > > I’ve now tested as well with the performance shell script from kafka > 2.0.0, it showed the same result as with 2.3.1. > > > > in my eyes at least 100k/s messages should be possible easily, especially > with the number of threads of NiFi… As we have sources which generates > about 300k to 400k/s messages NiFi is at the moment far to slow to even > consume real time, and it gets even worse if we are behind the offset we > can’t catch up anymore. > > > > At the moment we can’t use NiFi to consume from Kafka. > > > > Cheers Josef > > > > > > *From: *"christophe.mon...@post.ch" <christophe.mon...@post.ch> > *Reply to: *"users@nifi.apache.org" <users@nifi.apache.org> > *Date: *Friday, 19 June 2020 at 08:54 > *To: *"users@nifi.apache.org" <users@nifi.apache.org> > *Subject: *RE: ConsumeKafkaRecord Performance Issue > > > > Hi Josef > > > > I noticed that you run kafka-consumer-perf-test.sh of Kafka 2.3.1 but NiFi > is bundled with kafka-clients-2.0.0.jar > > Maybe you could try the performance test with the same client version? > > > > What is the version of your kafka brokers? > > > > Regards > > Chris > > > > *From:* josef.zahn...@swisscom.com <josef.zahn...@swisscom.com> > *Sent:* Friday, 19. June 2020 07:55 > *To:* users@nifi.apache.org > *Subject:* ConsumeKafkaRecord Performance Issue > > > > Hi guys, > > > > We have faced a strange behavior of the ConsumeKafkaRecord processor (and > it’s pendant ConsumeKafka). We have a kafka Topic with 15 partitions and a > producer which inserts via NiFi in peak about 40k records per second to the > topic. The thing is now, it doesn’t matter whether we are using the 8-Node > Cluster or configuring execution on “Primary Node”, the performance is > terrible. We made a test with execution on “Primary Node” and started with > one thread, the result can you see below. As soon as we reached 3 threads > the performance went down and never went higher than that, doesn’t matter > how many threads or cluster nodes. We tried 2 threads in the 8 node cluster > (16 threads in total) and even more. Didn’t help, we stuck at this > 12’000’000 – 14’000’000 records per 5 min (so round about 45k records per > second). Btw. for the tests we were always behind the offset, so there were > a lot of messages in the kafka queue. > > > > <image001.png> > > > > > > We also tested with the performance script which comes with kafka. It > showed 250k messages/s without any tuning at all (however without any > decoding of the messages of course). So in theory kafka and the network in > between couldn’t be the culprit. It must be something within NiFi. > > > > [user@nifi ~]$ /opt/kafka_2.12-2.3.1/bin/kafka-consumer-perf-test.sh > --broker-list kafka.xyz.net:9093 <http://kafka.sbd.corproot.net:9093/> --group > nifi --topic events --consumer.config > /opt/sbd_kafka/credentials_prod/client-ssl.properties --messages 3000000 > > > > start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, > nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec > > 2020-06-15 17:20:05:273, 2020-06-15 > 17:20:20:429, 515.7424, 34.0289, 3000000, 197941.4093, 3112, 12044, 42.8215, > 249086.6822 > > > > > > We have also seen that “Max Poll Records” in our case never gets reached, > we had in max. about 400 records in one flowfile even though we configured > 100’000 - which could be a part of the problem. > > > > <image002.png> > > > > Seems that I’m not alone with my issue, even though his performance was > even worse than ours: > > > https://stackoverflow.com/questions/62104646/nifi-poor-performance-of-consumekafkarecord-2-0-and-consumekafka-2-0 > > > > Any help would be really appreciated. > > > > If nobody has an idea I have to open a bug ticket :-(. > > > > Cheers, Josef > > >