Using 2.3.1 Brokers makes things worse. There are now 2 fetch.max.wait.ms delays before messages are delivered even though they were available at the beginning.
2020-03-09 11:40:23,878 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'EE-ManagedThreadFactory-default-Thread-2' [Consumer clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Fetch READ_UNCOMMITTED at offset 28 for partition Ledger-1 returned fetch data (error=NONE, highWaterMark=29, lastStableOffset = 29, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=280) 2020-03-09 11:40:23,878 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'EE-ManagedThreadFactory-default-Thread-2' [Consumer clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Ignoring fetched records for partition Ledger-1 since it no longer has valid position 2020-03-09 11:40:23,878 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'EE-ManagedThreadFactory-default-Thread-2' [Consumer clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added READ_UNCOMMITTED fetch request for partition Ledger-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), epoch=-1}} to node localhost:9093 (id: 1001 rack: null) 2020-03-09 11:40:23,878 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'EE-ManagedThreadFactory-default-Thread-2' [Consumer clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(Ledger-1), implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: null) 2020-03-09 11:40:24,382 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'EE-ManagedThreadFactory-default-Thread-2' [Consumer clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added READ_UNCOMMITTED fetch request for partition Ledger-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), epoch=-1}} to node localhost:9093 (id: 1001 rack: null) 2020-03-09 11:40:24,382 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'EE-ManagedThreadFactory-default-Thread-2' [Consumer clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: null) 2020-03-09 11:40:24,382 DEBUG [org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient] 'EE-ManagedThreadFactory-default-Thread-2' [Consumer clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Handling OffsetsForLeaderEpoch response for Ledger-1. Got offset 29 for epoch 0 2020-03-09 11:40:24,885 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'EE-ManagedThreadFactory-default-Thread-2' [Consumer clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added READ_UNCOMMITTED fetch request for partition Ledger-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), epoch=-1}} to node localhost:9093 (id: 1001 rack: null) 2020-03-09 11:40:24,885 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'EE-ManagedThreadFactory-default-Thread-2' [Consumer clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added READ_UNCOMMITTED fetch request for partition Ledger-1 at position FetchPosition{offset=28, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), epoch=-1}} to node localhost:9093 (id: 1001 rack: null) 2020-03-09 11:40:24,885 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'EE-ManagedThreadFactory-default-Thread-2' [Consumer clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(Ledger-1), toForget=(), implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: null) 2020-03-09 11:40:24,887 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'EE-ManagedThreadFactory-default-Thread-2' [Consumer clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Fetch READ_UNCOMMITTED at offset 28 for partition Ledger-1 returned fetch data (error=NONE, highWaterMark=29, lastStableOffset = 29, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=280) 2020-03-09 11:40:24,889 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'EE-ManagedThreadFactory-default-Thread-2' [Consumer clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added READ_UNCOMMITTED fetch request for partition Ledger-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), epoch=-1}} to node localhost:9093 (id: 1001 rack: null) 2020-03-09 11:40:24,889 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'EE-ManagedThreadFactory-default-Thread-2' [Consumer clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added READ_UNCOMMITTED fetch request for partition Ledger-1 at position FetchPosition{offset=29, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), epoch=-1}} to node localhost:9093 (id: 1001 rack: null) 2020-03-09 11:40:24,889 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'EE-ManagedThreadFactory-default-Thread-2' [Consumer clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(Ledger-1), toForget=(), implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: null) > On 9/03/2020, at 10:48 AM, James Olsen <ja...@inaseq.com> wrote: > > Thanks for your response. Yes the second issue can be mitigated by reducing > the fetch.max.wait.ms although reducing it too far creates excessive CPU load > on the Brokers. However I've done some further testing and found what looks > like the underlying cause. > > In the scenario below the Consumer is consuming from 2 Partitions (MyTopic-0 > and MyTopic-1). There is a cycle of messages being fetched and ignored. In > each cycle a subsequent fetch to get them again does not occur until after a > complete fetch.max.wait.ms expires. I suspect this is due initially to the > fact that MyTopic-0 has never had any messages and hence has no epoch and > subsequently is being fetched on it’s own - but being empty results in the > delay. Someone who knows more about the meaning of "toSend=(), > toForget=(MyTopic-1), implied=(MyTopic-0)” might be able to enlighten things > further. > > I can post a more complete log of this if anyone wants to take a look. > > I’m going to try Kafka 2.3 Brokers to see if the "Skipping validation …” bit > has any impact. > > 2020-03-09 09:46:43,093 DEBUG > [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer > clientId=consumer-Redacted-group-1, groupId=Redacted-group] Fetch > READ_UNCOMMITTED at offset 40 for partition MyTopic-1 returned fetch data > (error=NONE, highWaterMark=41, lastStableOffset = 41, logStartOffset = 0, > preferredReadReplica = absent, abortedTransactions = null, > recordsSizeInBytes=573) > > 2020-03-09 09:46:43,093 DEBUG > [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer > clientId=consumer-Redacted-group-1, groupId=Redacted-group] Ignoring fetched > records for partition MyTopic-1 since it no longer has valid position > > 2020-03-09 09:46:43,093 DEBUG > [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer > clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added > READ_UNCOMMITTED fetch request for partition MyTopic-0 at position > FetchPosition{offset=0, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), > epoch=-1}} to node localhost:9093 (id: 1001 rack: null) > > 2020-03-09 09:46:43,093 DEBUG > [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer > clientId=consumer-Redacted-group-1, groupId=Redacted-group] Sending > READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(MyTopic-1), > implied=(MyTopic-0)) to broker localhost:9093 (id: 1001 rack: null) > > 2020-03-09 09:46:43,095 DEBUG > [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer > clientId=consumer-Redacted-group-1, groupId=Redacted-group] Skipping > validation of fetch offsets for partitions [MyTopic-1] since the broker does > not support the required protocol version (introduced in Kafka 2.3) > > 2020-03-09 09:46:43,597 DEBUG > [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer > clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added > READ_UNCOMMITTED fetch request for partition MyTopic-0 at position > FetchPosition{offset=0, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), > epoch=-1}} to node localhost:9093 (id: 1001 rack: null) > > 2020-03-09 09:46:43,597 DEBUG > [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer > clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added > READ_UNCOMMITTED fetch request for partition MyTopic-1 at position > FetchPosition{offset=40, offsetEpoch=Optional[0], > currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), > epoch=-1}} to node localhost:9093 (id: 1001 rack: null) > > 2020-03-09 09:46:43,597 DEBUG > [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer > clientId=consumer-Redacted-group-1, groupId=Redacted-group] Sending > READ_UNCOMMITTED IncrementalFetchRequest(toSend=(MyTopic-1), toForget=(), > implied=(MyTopic-0)) to broker localhost:9093 (id: 1001 rack: null) > > 2020-03-09 09:46:43,599 DEBUG > [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer > clientId=consumer-Redacted-group-1, groupId=Redacted-group] Fetch > READ_UNCOMMITTED at offset 40 for partition MyTopic-1 returned fetch data > (error=NONE, highWaterMark=41, lastStableOffset = 41, logStartOffset = 0, > preferredReadReplica = absent, abortedTransactions = null, > recordsSizeInBytes=573) > > > On 5/03/2020, at 11:45 PM, M. Manna > <manme...@gmail.com<mailto:manme...@gmail.com>> wrote: > > Hi James, > > 3 Consumers in a group means you are having 20 partitions per consumer (as > per your 60 partition and 1 CGroup setup), 5 means 12. There's nothing > special about these numbers as you also noticed. > Have you tried setting fetch.max.wait.ms = 0 and see whether that's making > a difference for you? > > Thanks, > > > On Thu, 5 Mar 2020 at 03:43, James Olsen > <ja...@inaseq.com<mailto:ja...@inaseq.com>> wrote: > > I’m seeing behaviour that I don’t understand when I have Consumers > fetching from multiple Partitions from the same Topic. There are two > different conditions arising: > > 1. A subset of the Partitions allocated to a given Consumer not being > consumed at all. The Consumer appears healthy, the Thread is running and > logging activity and is successfully processing records from some of the > Partitions it has been assigned. I don’t think this is due to the first > Partition fetched filling a Batch (KIP-387). The problem does not occur if > we have a particular number of Consumers (3 in this case) but it has failed > with a range of other larger values. I don’t think there is anything > special about 3 - it just happens to work OK with that value although it is > the same as the Broker and Replica count. When we tried 6, 5 Consumers > were fine but 1 exhibited this issue. > > 2. Up to a half second delay between Producer sending and Consumer > receiving a message. This looks suspiciously like the fetch.max.wait.ms=500 > but we also have fetch.min.bytes=1 so should get messages as soon as > something is available. The only explanation I can think of is if the > fetch.max.wait.ms is applied in full to the first Partition checked and > it remains empty for the duration. Then it moves on to a subsequent > non-empty Partition and delivers messages from there. > > Our environment is AWS MSK (Kafka 2.2.1) and Kafka Java client 2.4.0. > > All environments appear healthy and under light load, e.g. clients only > operating at a 1-2% CPU, Brokers (3) at 5-10% CPU. No swap, no crashes, > no dead threads etc. > > Typical scenario is a Topic with 60 Partitions, 3 Replicas and a single > ConsumerGroup with 5 Consumers. The Partitioning is for semantic purposes > with the intention being to add more Consumers as the business grows and > load increases. Some of the Partitions are always empty due to using short > string keys and the default Partitioner - we will probably implement a > custom Partitioner to achieve better distribution in the near future. > > I don’t have access to the detailed JMX metrics yet but am working on that > in the hope it will help diagnose. > > Thoughts and advice appreciated! >