James Cheng created KAFKA-6088:
----------------------------------

             Summary: Kafka Consumer slows down when reading from highly 
compacted topics
                 Key: KAFKA-6088
                 URL: https://issues.apache.org/jira/browse/KAFKA-6088
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 0.10.2.1
            Reporter: James Cheng
             Fix For: 0.11.0.0


Summary of the issue
-----
We found a performance issue with the Kafka Consumer where it gets less 
efficient if you have frequent gaps in offsets (which happens when there is 
lots of compaction on the topic).

The issue is present in 0.10.2.1 and possibly prior.

It is fixed in 0.11.0.0.

Summary of cause
-----
The fetcher code assumes that there will be no gaps in message offsets. If 
there are, it does an additional round trip to the broker. For topics with 
large gaps in offsets, it is possible that most calls to {{poll()}} will 
generate a roundtrip to the broker.

Background and details 
-----
We have a topic with roughly 8 million records. The topic is log compacted. It 
turns out that most of the initial records in the topic were never overwritten, 
whereas in the 2nd half of the topic we had lots of overwritten records. That 
means that for the first part of the topic, there are no gaps in offsets. But 
in the 2nd part of the topic, there are frequent gaps in the offsets (due to 
records being compacted away).

We have a consumer that starts up and reads the entire topic from beginning to 
end. We noticed that the consumer would read through the first part of the 
topic very quickly. When it got to the part of the topic with frequent gaps in 
offsets, consumption rate slowed down dramatically. This slowdown was 
consistent across multiple runs.

What is happening is this:
1) A call to {{poll()}} happens. The consumer goes to the broker and returns 
1MB of data (the default of {{max.partition.fetch.bytes}}). It then returns to 
the caller just 500 records (the default of {{max.poll.records}}), and keeps 
the rest of the data in memory to use in future calls to {{poll()}}. 
2) Before returning the 500 records, the consumer library records the *next* 
offset it should return. It does so by taking the offset of the last record, 
and adds 1 to it. (The offset of the 500th message from the set, plus 1). It 
calls this the {{nextOffset}}
3) The application finishes processing the 500 messages, and makes another call 
to {{poll()}} happens. During this call, the consumer library does a sanity 
check. It checks that the first message of the set *it is about to return* has 
an offset that matches the value of {{nextOffset}}. That is it checks if the 
501th record has an offset that is 1 greater than the 500th record.
        a. If it matches, then it returns an additional 500 records, and 
increments the {{nextOffset}} to (offset of the 1000th record, plus 1)
        b. If it doesn't match, then it throws away the remainder of the 1MB of 
data that it stored in memory in step 1, and it goes back to the broker to 
fetch an additional 1MB of data, starting at the offset {{nextOffset}}.

In topics have no gaps (a non-compacted topic), then the code will always hit 
the 3a code path.
If the topic has gaps in offsets and the call to {{poll()}} happens to fall 
onto a gap, then the code will hit code path 3b.

If the gaps are frequent, then it will frequently hit code path 3b.

The worst case scenario that can happen is if you have a large number of gaps, 
and you run with {{max.poll.records=1}}. Every gap will result in a new fetch 
to the broker. You may possibly end up only processing one message per fetch. 
Or, said another way, you will end up doing a single fetch for every single 
message in the partition.


Repro
-----

We created a repro. It appears that the bug is in 0.10.2.1, but was fixed in 
0.11. I've attached the tarball with all the code and instructions. 

The repro is:
1) Create a single partition topic with log compaction turned on 
2) Write messages with the following keys: 1 1 2 2 3 3 4 4 5 5 ... (each 
message key written twice in a row) 
3) Let compaction happen. This would mean that that offsets 0 2 4 6 8 10 ... 
would be compacted away 
4) Consume from this topic with {{max.poll.records=1}}

More concretely,

Here is the producer code:
{code}
Producer<String, String> producer = new KafkaProducer<String, String>(props); 
for (int i = 0; i < 1000000; i++) { 
    producer.send(new ProducerRecord<String, String>("compacted", 
Integer.toString(i), Integer.toString(i))); 
    producer.send(new ProducerRecord<String, String>("compacted", 
Integer.toString(i), Integer.toString(i))); 
} 
producer.flush(); 
producer.close();
{code}


When consuming with a 0.10.2.1 consumer, you can see this pattern (with Fetcher 
logs at DEBUG, see file consumer_0.10.2/debug.log):

{code}
offset = 1, key = 0, value = 0 
22:58:51.262 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched 
records for compacted-0 at offset 3 since the current position is 2 
22:58:51.263 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch 
for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
offset = 3, key = 1, value = 1 
22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched 
records for compacted-0 at offset 5 since the current position is 4 
22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch 
for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
offset = 5, key = 2, value = 2 
22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched 
records for compacted-0 at offset 7 since the current position is 6 
22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch 
for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
offset = 7, key = 3, value = 3 
22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched 
records for compacted-0 at offset 9 since the current position is 8 
22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch 
for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
offset = 9, key = 4, value = 4 
22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched 
records for compacted-0 at offset 11 since the current position is 10 
22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch 
for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
offset = 11, key = 5, value = 5 
22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched 
records for compacted-0 at offset 13 since the current position is 12 
22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch 
for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
offset = 13, key = 6, value = 6 
22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched 
records for compacted-0 at offset 15 since the current position is 14 
22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch 
for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null)
{code}


When consuming with a 0.11.0.1 consumer ,you can see the following pattern: 
(see file consumer_0.11/debug.log): 
{code}
offset = 1, key = 0, value = 0 
offset = 3, key = 1, value = 1 
offset = 5, key = 2, value = 2 
offset = 7, key = 3, value = 3 
offset = 9, key = 4, value = 4 
offset = 11, key = 5, value = 5 
offset = 13, key = 6, value = 6 
offset = 15, key = 7, value = 7 
offset = 17, key = 8, value = 8 
offset = 19, key = 9, value = 9 
offset = 21, key = 10, value = 10 
{code}

>From looking at the github history, it appears it was fixed in 
>https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0

Specifically, this line 
https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0L930

Was replaced by this line: 
https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0R933

Mitigation
-----
This problem is fixed in 0.11.0.0. If you can upgrade to 0.11.0.0, then you 
will not be affected by the problem.

If you cannot upgrade to 0.11.0.0, then you can reduce the impact of this by 
increasing the value of {{max.poll.records}}. This works because check happens 
on each call to {{poll()}}, and increasing the value of {{max.poll.records}} 
will reduce the number of calls to {{poll()}}.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to