Using 2.3.1 Brokers makes things worse.  There are now 2 fetch.max.wait.ms 
delays before messages are delivered even though they were available at the 
beginning.

2020-03-09 11:40:23,878 DEBUG 
[org.apache.kafka.clients.consumer.internals.Fetcher] 
'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Fetch 
READ_UNCOMMITTED at offset 28 for partition Ledger-1 returned fetch data 
(error=NONE, highWaterMark=29, lastStableOffset = 29, logStartOffset = 0, 
preferredReadReplica = absent, abortedTransactions = null, 
recordsSizeInBytes=280)
2020-03-09 11:40:23,878 DEBUG 
[org.apache.kafka.clients.consumer.internals.Fetcher] 
'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Ignoring 
fetched records for partition Ledger-1 since it no longer has valid position
2020-03-09 11:40:23,878 DEBUG 
[org.apache.kafka.clients.consumer.internals.Fetcher] 
'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
READ_UNCOMMITTED fetch request for partition Ledger-0 at position 
FetchPosition{offset=0, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
2020-03-09 11:40:23,878 DEBUG 
[org.apache.kafka.clients.consumer.internals.Fetcher] 
'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Sending 
READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(Ledger-1), 
implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: null)
2020-03-09 11:40:24,382 DEBUG 
[org.apache.kafka.clients.consumer.internals.Fetcher] 
'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
READ_UNCOMMITTED fetch request for partition Ledger-0 at position 
FetchPosition{offset=0, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
2020-03-09 11:40:24,382 DEBUG 
[org.apache.kafka.clients.consumer.internals.Fetcher] 
'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Sending 
READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), 
implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: null)
2020-03-09 11:40:24,382 DEBUG 
[org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient] 
'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Handling 
OffsetsForLeaderEpoch response for Ledger-1. Got offset 29 for epoch 0
2020-03-09 11:40:24,885 DEBUG 
[org.apache.kafka.clients.consumer.internals.Fetcher] 
'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
READ_UNCOMMITTED fetch request for partition Ledger-0 at position 
FetchPosition{offset=0, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
2020-03-09 11:40:24,885 DEBUG 
[org.apache.kafka.clients.consumer.internals.Fetcher] 
'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
READ_UNCOMMITTED fetch request for partition Ledger-1 at position 
FetchPosition{offset=28, offsetEpoch=Optional[0], 
currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
2020-03-09 11:40:24,885 DEBUG 
[org.apache.kafka.clients.consumer.internals.Fetcher] 
'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Sending 
READ_UNCOMMITTED IncrementalFetchRequest(toSend=(Ledger-1), toForget=(), 
implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: null)
2020-03-09 11:40:24,887 DEBUG 
[org.apache.kafka.clients.consumer.internals.Fetcher] 
'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Fetch 
READ_UNCOMMITTED at offset 28 for partition Ledger-1 returned fetch data 
(error=NONE, highWaterMark=29, lastStableOffset = 29, logStartOffset = 0, 
preferredReadReplica = absent, abortedTransactions = null, 
recordsSizeInBytes=280)
2020-03-09 11:40:24,889 DEBUG 
[org.apache.kafka.clients.consumer.internals.Fetcher] 
'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
READ_UNCOMMITTED fetch request for partition Ledger-0 at position 
FetchPosition{offset=0, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
2020-03-09 11:40:24,889 DEBUG 
[org.apache.kafka.clients.consumer.internals.Fetcher] 
'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
READ_UNCOMMITTED fetch request for partition Ledger-1 at position 
FetchPosition{offset=29, offsetEpoch=Optional[0], 
currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
2020-03-09 11:40:24,889 DEBUG 
[org.apache.kafka.clients.consumer.internals.Fetcher] 
'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Sending 
READ_UNCOMMITTED IncrementalFetchRequest(toSend=(Ledger-1), toForget=(), 
implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: null)


> On 9/03/2020, at 10:48 AM, James Olsen <ja...@inaseq.com> wrote:
> 
> Thanks for your response.  Yes the second issue can be mitigated by reducing 
> the fetch.max.wait.ms although reducing it too far creates excessive CPU load 
> on the Brokers.  However I've done some further testing and found what looks 
> like the underlying cause.
> 
> In the scenario below the Consumer is consuming from 2 Partitions (MyTopic-0 
> and MyTopic-1).  There is a cycle of messages being fetched and ignored.  In 
> each cycle a subsequent fetch to get them again does not occur until after a 
> complete fetch.max.wait.ms expires.  I suspect this is due initially to the 
> fact that MyTopic-0 has never had any messages and hence has no epoch and 
> subsequently is being fetched on it’s own - but being empty results in the 
> delay.  Someone who knows more about the meaning of "toSend=(), 
> toForget=(MyTopic-1), implied=(MyTopic-0)” might be able to enlighten things 
> further.
> 
> I can post a more complete log of this if anyone wants to take a look.
> 
> I’m going to try Kafka 2.3 Brokers to see if the "Skipping validation …” bit 
> has any impact.
> 
> 2020-03-09 09:46:43,093 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Fetch 
> READ_UNCOMMITTED at offset 40 for partition MyTopic-1 returned fetch data 
> (error=NONE, highWaterMark=41, lastStableOffset = 41, logStartOffset = 0, 
> preferredReadReplica = absent, abortedTransactions = null, 
> recordsSizeInBytes=573)
> 
> 2020-03-09 09:46:43,093 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Ignoring fetched 
> records for partition MyTopic-1 since it no longer has valid position
> 
> 2020-03-09 09:46:43,093 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added 
> READ_UNCOMMITTED fetch request for partition MyTopic-0 at position 
> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
> 
> 2020-03-09 09:46:43,093 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Sending 
> READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(MyTopic-1), 
> implied=(MyTopic-0)) to broker localhost:9093 (id: 1001 rack: null)
> 
> 2020-03-09 09:46:43,095 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Skipping 
> validation of fetch offsets for partitions [MyTopic-1] since the broker does 
> not support the required protocol version (introduced in Kafka 2.3)
> 
> 2020-03-09 09:46:43,597 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added 
> READ_UNCOMMITTED fetch request for partition MyTopic-0 at position 
> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
> 
> 2020-03-09 09:46:43,597 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added 
> READ_UNCOMMITTED fetch request for partition MyTopic-1 at position 
> FetchPosition{offset=40, offsetEpoch=Optional[0], 
> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
> 
> 2020-03-09 09:46:43,597 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Sending 
> READ_UNCOMMITTED IncrementalFetchRequest(toSend=(MyTopic-1), toForget=(), 
> implied=(MyTopic-0)) to broker localhost:9093 (id: 1001 rack: null)
> 
> 2020-03-09 09:46:43,599 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Fetch 
> READ_UNCOMMITTED at offset 40 for partition MyTopic-1 returned fetch data 
> (error=NONE, highWaterMark=41, lastStableOffset = 41, logStartOffset = 0, 
> preferredReadReplica = absent, abortedTransactions = null, 
> recordsSizeInBytes=573)
> 
> 
> On 5/03/2020, at 11:45 PM, M. Manna 
> <manme...@gmail.com<mailto:manme...@gmail.com>> wrote:
> 
> Hi James,
> 
> 3 Consumers in a group means you are having 20 partitions per consumer (as
> per your 60 partition and 1 CGroup setup), 5 means 12. There's nothing
> special about these numbers as you also noticed.
> Have you tried setting fetch.max.wait.ms = 0 and see whether that's making
> a difference for you?
> 
> Thanks,
> 
> 
> On Thu, 5 Mar 2020 at 03:43, James Olsen 
> <ja...@inaseq.com<mailto:ja...@inaseq.com>> wrote:
> 
> I’m seeing behaviour that I don’t understand when I have Consumers
> fetching from multiple Partitions from the same Topic.  There are two
> different conditions arising:
> 
> 1. A subset of the Partitions allocated to a given Consumer not being
> consumed at all.  The Consumer appears healthy, the Thread is running and
> logging activity and is successfully processing records from some of the
> Partitions it has been assigned.  I don’t think this is due to the first
> Partition fetched filling a Batch (KIP-387).  The problem does not occur if
> we have a particular number of Consumers (3 in this case) but it has failed
> with a range of other larger values.  I don’t think there is anything
> special about 3 - it just happens to work OK with that value although it is
> the same as the Broker and Replica count.  When we tried 6, 5 Consumers
> were fine but 1 exhibited this issue.
> 
> 2. Up to a half second delay between Producer sending and Consumer
> receiving a message.  This looks suspiciously like the fetch.max.wait.ms=500
> but we also have fetch.min.bytes=1 so should get messages as soon as
> something is available.  The only explanation I can think of is if the
> fetch.max.wait.ms is applied in full to the first Partition checked and
> it remains empty for the duration.  Then it moves on to a subsequent
> non-empty Partition and delivers messages from there.
> 
> Our environment is AWS MSK (Kafka 2.2.1) and Kafka Java client 2.4.0.
> 
> All environments appear healthy and under light load, e.g. clients only
> operating at a 1-2% CPU, Brokers (3) at 5-10% CPU.   No swap, no crashes,
> no dead threads etc.
> 
> Typical scenario is a Topic with 60 Partitions, 3 Replicas and a single
> ConsumerGroup with 5 Consumers.  The Partitioning is for semantic purposes
> with the intention being to add more Consumers as the business grows and
> load increases.  Some of the Partitions are always empty due to using short
> string keys and the default Partitioner - we will probably implement a
> custom Partitioner to achieve better distribution in the near future.
> 
> I don’t have access to the detailed JMX metrics yet but am working on that
> in the hope it will help diagnose.
> 
> Thoughts and advice appreciated!
> 

Reply via email to