ConsumeKafka is just taking the messages as raw bytes and writing the
content to disk. ConsumeKafkaRecord is serializing/deserializing the data
according to a schema and possibly converting the data into another format.
That's normal to have a huge performance difference and CPU consumption.

Regarding the number of messages per flow file, IIRC, I always had a good
amount of messages per flow file and a consistent number of messages across
flow files (in the use case I mentioned earlier in this thread). It was a
long time ago but I think I remember adding a dynamic property to set a
Kafka property but can't remember which one. Trying to find out.

Pierre

Le lun. 22 juin 2020 à 17:52, <josef.zahn...@swisscom.com> a écrit :

> Hi Mark,
>
>
>
> it really doesn’t matter what I configure for “Max Poll Records” (tried
> 10, 1’000, 100’000) or for “Max Uncommitted Time” (tried 1s, 10s, 100s).
> The flowfile size is always randomly between 1 and about 500 records. So
> those two parameters doesn’t work at all in my case. The theory behind it
> is clear, but it doesn’t work as expected… Of course the queue was more
> than full.
>
>
>
> In the meantime I’ve done some tests with the KafkaConsume processor, and
> the performance difference is again huge - 4 times better performance (1
> Million messages per sec) with the same topic and amount of threads for the
> Non-Record processor -> network limit reached. Seems that the
> RecordReader/RecordWriter part of the KafkaConsumeRecord processor consumes
> a lot of CPU power. Interesting that nobody complained about it until now,
> are we that high with a few 100’000 messages per second? We have sources
> which produces about 200’000 messages/s and we would like to consume that
> as well a few times faster than producing.
>
>
>
> We have now plans to implement a KafkaAvroConsumer, which is based on the
> KafkaConsumer processor. It will consume from kafka and write avro out
> instead of the plain message with demarcator. We hope to get the same great
> performance as with the KafkaConsume processor.
>
>
>
> Cheers Josef
>
>
>
> *From: *Mark Payne <marka...@hotmail.com>
> *Reply to: *"users@nifi.apache.org" <users@nifi.apache.org>
> *Date: *Monday, 22 June 2020 at 15:03
> *To: *"users@nifi.apache.org" <users@nifi.apache.org>
> *Subject: *Re: ConsumeKafkaRecord Performance Issue
>
>
>
> Josef,
>
>
>
> The Max Poll Records just gets handed to the Kafka client and limits the
> size of how much should be pulled back in a single request. This can be
> important because if you set the value really high, you could potentially
> buffer up a lot of messages in memory and consume a lot of heap. But
> setting it too low would result in small batches. So that property can play
> a roll in the size of a batch of records, but you should not expect to see
> batches output that are necessarily equal to the value there.
>
>
>
> What value do you have set for the “Max Uncommitted Time”? That can
> certainly play a roll in the size of the FlowFiles that are output.
>
>
>
> Thanks
>
> -Mark
>
>
>
>
>
> On Jun 22, 2020, at 2:48 AM, josef.zahn...@swisscom.com wrote:
>
>
>
> Hi Mark,
>
>
>
> thanks a lot for your explanation, makes fully sense! Did you checked as
> well the “Max Poll Records” parameter? Because no matter how high I’m
> setting it I’m getting always a random number of records back into one
> flowfile. The max. is about 400 records which isn’t ideal for small records
> as NiFi gets a lot of Flowfiles with a few kilobytes in case of a huge
> backlog.
>
>
>
> Cheers Josef
>
>
>
>
>
> *From: *Mark Payne <marka...@hotmail.com>
> *Reply to: *"users@nifi.apache.org" <users@nifi.apache.org>
> *Date: *Friday, 19 June 2020 at 17:06
> *To: *"users@nifi.apache.org" <users@nifi.apache.org>
> *Subject: *Re: ConsumeKafkaRecord Performance Issue
>
>
>
> Josef,
>
>
>
> Glad you were able to get past this hurdle. The reason for the consumer
> yielding is a bit complex. NiFi issues an async request to Kafka to
> retrieve messages. Then, NiFi performs a long-poll to get those messages
> from the Kafka client. If the client returns 0 messages from the long-poll,
> the assumption that NiFi makes is that there are no more messages available
> from Kafka. So it yields to avoid hammering the Kafka server constantly
> when there are no messages available. Unfortunately, though, I have found
> fairly recently by digging into the Kafka client code that returning 0
> messages happens not only when there are no messages available on the Kafka
> server but also if the client just takes longer than that long-poll (10
> milliseconds) to receive the response and prepare the messages on the
> client side. The client doesn’t appear to readily expose any information
> about whether or not there are more messages available, so this seems to be
> the best we can do with what the client currently provides.
>
>
>
> So setting a yield duration of 0 seconds will provide much higher
> throughput but may put more load on the Kafka brokers.
>
>
>
>
> On Jun 19, 2020, at 10:12 AM, josef.zahn...@swisscom.com wrote:
>
>
>
> Hi Mark, Pierre
>
>
>
> We are using NiFi 1.11.4, so fully up to date.
>
>
>
> Are you kidding me :-D,  “Yield Duration” was always on the default value
> (1 secs), as I didn’t expect that the processor “yields”. But due to your
> comment I’ve changed it to “0 secs”. I can’t believe it, the performance
> has been increased to the same value (about 250’000k messages per seconds)
> as the  kafka-consumer-perf-test.sh shows. Thanks a lot!! However 250k
> messages is still not enough to cover all our use cases, but at least it is
> now consistent to the kafka performance testing script. The Kafka Grafana
> shows about 60MB/s outgoing with the current number of messages.
>
>
>
> @Pierre: The setup you are referring to with 10-20Mio messages per
> seconds. How many partitions had they and how big were the messages? We are
> storing the messages in this example as AVRO with about 44 fields.
>
>
>
> Cheers Josef
>
>
>
>
>
> PS: below some more information about my setup (even though our main issue
> has been solved):
>
>
>
> As record reader I’m using a AvroReader which gets the schema from a
> confluent schema registry. Every setting there is default, except the
> connection parameters to confluent. As record writer I’m using
> AvroRecordSetWriter with a predefined schema as we only want to have a
> reduced column set.
>
>
>
> The 8 servers are using only SAS SSDs and doesn’t store the data. The data
> goes from ConsumeKafkaRecord directly into our DB which runs on another
> cluster. As I mentioned already, problem was there whether I’ was using
> “Primary Only” or distribute it on the cluster. So it wasn’t a limit on a
> single node.
>
>
>
> <image001.png>
>
>
>
>
>
>
>
> *From: *Mark Payne <marka...@hotmail.com>
> *Reply to: *"users@nifi.apache.org" <users@nifi.apache.org>
> *Date: *Friday, 19 June 2020 at 14:46
> *To: *"users@nifi.apache.org" <users@nifi.apache.org>
> *Subject: *Re: ConsumeKafkaRecord Performance Issue
>
>
>
> Josef,
>
>
>
> Have you tried updating the processor’s Yield Duration (configure ->
> settings tab)? Setting that to “0 secs” can make a big difference in
> ConsumeKafka(Record)’s performance.
>
>
>
> Also, what kind of data rate (MB/sec) are you looking at, which record
> reader and writer are you using? Are you using a schema registry? Spinning
> disk or ssd?
>
>
>
> All of these can make a big difference in performance.
>
>
>
> Thanks
>
> Mark
>
>
>
>
>
> On Jun 19, 2020, at 3:45 AM, "josef.zahn...@swisscom.com" <
> josef.zahn...@swisscom.com> wrote:
>
> Hi Chris
>
>
>
> Our brokers are using Kafka 2.3.0, just slightly different to my
> kafka-consumer-perf-test.sh.
>
>
>
> I’ve now tested as well with the performance shell script from kafka
> 2.0.0, it showed the same result as with 2.3.1.
>
>
>
> in my eyes at least 100k/s messages should be possible easily, especially
> with the number of threads of NiFi… As we have sources which generates
> about 300k to 400k/s messages NiFi is at the moment far to slow to even
> consume real time, and it gets even worse if we are behind the offset we
> can’t catch up anymore.
>
>
>
> At the moment we can’t use NiFi to consume from Kafka.
>
>
>
> Cheers Josef
>
>
>
>
>
> *From: *"christophe.mon...@post.ch" <christophe.mon...@post.ch>
> *Reply to: *"users@nifi.apache.org" <users@nifi.apache.org>
> *Date: *Friday, 19 June 2020 at 08:54
> *To: *"users@nifi.apache.org" <users@nifi.apache.org>
> *Subject: *RE: ConsumeKafkaRecord Performance Issue
>
>
>
> Hi Josef
>
>
>
> I noticed that you run kafka-consumer-perf-test.sh of Kafka 2.3.1 but NiFi
> is bundled with kafka-clients-2.0.0.jar
>
> Maybe you could try the performance test with the same client version?
>
>
>
> What is the version of your kafka brokers?
>
>
>
> Regards
>
> Chris
>
>
>
> *From:* josef.zahn...@swisscom.com <josef.zahn...@swisscom.com>
> *Sent:* Friday, 19. June 2020 07:55
> *To:* users@nifi.apache.org
> *Subject:* ConsumeKafkaRecord Performance Issue
>
>
>
> Hi guys,
>
>
>
> We have faced a strange behavior of the ConsumeKafkaRecord processor (and
> it’s pendant ConsumeKafka). We have a kafka Topic with 15 partitions and a
> producer which inserts via NiFi in peak about 40k records per second to the
> topic. The thing is now, it doesn’t matter whether we are using the 8-Node
> Cluster or configuring execution on “Primary Node”, the performance is
> terrible. We made a test with execution on “Primary Node” and started with
> one thread, the result can you see below. As soon as we reached 3 threads
> the performance went down and never went higher than that, doesn’t matter
> how many threads or cluster nodes. We tried 2 threads in the 8 node cluster
> (16 threads in total) and even more. Didn’t help, we stuck at this
> 12’000’000 – 14’000’000 records per 5 min (so round about 45k records per
> second). Btw. for the tests we were always behind the offset, so there were
> a lot of messages in the kafka queue.
>
>
>
> <image001.png>
>
>
>
>
>
> We also tested with the performance script which comes with kafka. It
> showed 250k messages/s without any tuning at all (however without any
> decoding of the messages of course). So in theory kafka and the network in
> between couldn’t be the culprit. It must be something within NiFi.
>
>
>
> [user@nifi ~]$ /opt/kafka_2.12-2.3.1/bin/kafka-consumer-perf-test.sh
> --broker-list kafka.xyz.net:9093 <http://kafka.sbd.corproot.net:9093/> --group
> nifi --topic events --consumer.config
> /opt/sbd_kafka/credentials_prod/client-ssl.properties --messages 3000000
>
>
>
> start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg,
> nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
>
> 2020-06-15 17:20:05:273, 2020-06-15
> 17:20:20:429, 515.7424, 34.0289, 3000000, 197941.4093, 3112, 12044, 42.8215, 
> 249086.6822
>
>
>
>
>
> We have also seen that “Max Poll Records” in our case never gets reached,
> we had in max. about 400 records in one flowfile even though we configured
> 100’000 - which could be a part of the problem.
>
>
>
> <image002.png>
>
>
>
> Seems that I’m not alone with my issue, even though his performance was
> even worse than ours:
>
>
> https://stackoverflow.com/questions/62104646/nifi-poor-performance-of-consumekafkarecord-2-0-and-consumekafka-2-0
>
>
>
> Any help would be really appreciated.
>
>
>
> If nobody has an idea I have to open a bug ticket :-(.
>
>
>
> Cheers, Josef
>
>
>

Reply via email to