[
https://issues.apache.org/jira/browse/KAFKA-8334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lucas Bradstreet updated KAFKA-8334:
------------------------------------
Affects Version/s: (was: 1.1.0)
> Occasional OffsetCommit Timeout
> -------------------------------
>
> Key: KAFKA-8334
> URL: https://issues.apache.org/jira/browse/KAFKA-8334
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 1.1.1, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0
> Reporter: windkithk
> Assignee: Chia-Ping Tsai
> Priority: Major
> Fix For: 2.7.0
>
> Attachments: kafka8334.patch, offsetcommit_p99.9_cut.jpg
>
>
> h2. 1) Issue Summary
> Since we have upgraded to 1.1, we have observed occasional OffsetCommit
> timeouts from clients
> {code:java}
> Offset commit failed on partition sometopic-somepartition at offset
> someoffset: The request timed out{code}
> Normally OffsetCommit completes within 10ms but when we check the 99.9
> percentile, we see the request duration time jumps up to 5000 ms
> (offsets.commit.timeout.ms)
> Here is a screenshot of prometheus recording
> kafka_network_request_duration_milliseconds
> (offsetcommit_p99.9_cut.jpg)
> and after checking the duration breakdown, most of the time was spent on
> "Remote" Scope
> (Below is a request log line produced by inhouse slow request logger
> {code:java}
> [2019-04-16 13:06:20,339] WARN Slow
> response:RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2,
> clientId=kafka-python-1.4.6, correlationId=953) --
> {group_id=wilson-tester,generation_id=28,member_id=kafka-python-1.4.6-69ed979d-a069-4c6d-9862-e4fc34883269,retention_time=-1,topics=[{topic=test,partitions=[{partition=2,offset=63456,metadata=null}]}]}
> from
> connection;totalTime:5001.942000,requestQueueTime:0.030000,localTime:0.574000,remoteTime:5001.173000,responseQueueTime:0.058000,sendTime:0.053000,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
> (kafka.request.logger)
> {code}
> h2. 2) What got changed in 1.1 from 0.10.2.1?
> # Log Level Changed
> In 1.1 Kafka Consumer, logging about timed out OffsetCommit is changed from
> DEBUG to WARN
> # Group Lock is acquired when trying to complete DelayedProduce of
> OffsetCommit
> This was added after 0.11.0.2
> (Ticket: https://issues.apache.org/jira/browse/KAFKA-6042)
> (PR: https://github.com/apache/kafka/pull/4103)
> (in 1.1
> https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L292)
> # Followers do incremental fetch
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
> h2. 3) Interaction between OffsetCommit, DelayedProduce and FetchFollower
> {quote}
> OffsetCommit append a message of committed offset to partition of topic
> `__consumer_offsets`
> During the append, it would create a DelayedProduce with lock to
> GroupMetadata.lock (ReentrantLock) and add to delayedProducePurgatory
> When follower fetches the partition of topic `__consumer_offsets` and causes
> an increase in HighWaterMark, delayedProducePurgatory would be transversed
> and all operations related to the partition may be completed
> {quote}
> *DelayedProduce from OffsetCommit may not be completed, if the group metadata
> lock was held by others*
> h2. 4) Reproduce
> h4. Methodology
> {code}
> 1. DelayedProduce on __consumer_offsets could not be completed if the
> group.lock is acquired by others
> 2. We spam requests like Heartbeat to keep acquiring group.lock
> 3. We keep sending OffsetCommit and check the processing time
> {code}
> h4. Reproduce Script
> https://gist.github.com/windkit/3384bb86dc146111d1e0857e66b85861
> # jammer.py - join the group "wilson-tester" and keep spamming Heartbeat
> # tester.py - fetch one message and do a long processing (or sleep) and then
> commit the offset
> h4. Result
> ||Seq||Operation||Lock||
> |1|OffsetCommit Request
> |2|Append to local __consumer_offsets
> |3|DelayedProduce tryComplete
> |4|Added into delayedProducePurgatory
> |5|FetchFollower1 Fetch
> |6|FetchFollower2 Fetch
> |7|Heartbeat Request|Acquired group.lock
> |8|FetchFollower2 maybeTryComplete DelayedProduce|Failed to acquire group.lock
> |9|Heartbeat Response|Released group.lock
> | |(NO FetchFollower Requests on the partitions __consumer_offsets)
> |10|OffsetCommit Response (Timeout)
> h4. Trace Log
> {code}
> // The OffsetCommit Request
> [2019-04-15 23:59:53,736] TRACE [KafkaApi-1] Handling
> request:RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2,
> clientId=kafka-python-1.4.6, correlationId=2114) --
> {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-60008b58-4d6a-4cfd-948f-dd9e19e7f981,retention_time=-1,topics=[{topic=test,partitions=[{partition=2,offset=22654,metadata=null}]}]}
> from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
> (kafka.server.KafkaApis)
>
> // Initial Check of DelayedProduce:tryCompleteElseWatch
> //
> https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/DelayedOperation.scala#L217
> [2019-04-15 23:59:53,736] TRACE Initial partition status for
> __consumer_offsets-48 is [acksPending: true, error: 7, startOffset: 23134,
> requiredOffset: 23135] (kafka.server.DelayedProduce)
> [2019-04-15 23:59:53,736] TRACE Checking produce satisfaction for
> __consumer_offsets-48, current status [acksPending: true, error: 7,
> startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
> [2019-04-15 23:59:53,736] TRACE Checking produce satisfaction for
> __consumer_offsets-48, current status [acksPending: true, error: 7,
> startOffset: 23134, requiredOffset: 23135] (kafka.server.DelayedProduce)
>
> // Follower fetching the new message in __consumer_offsets-48, and with
> Heartbeat in between
> // DelayedOperation:maybeTryComplete leaves the DelayedProduce unchecked when
> it cannot obtain the lock (group.lock)
> //
> https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/server/DelayedOperation.scala#L371
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response
> org.apache.kafka.common.requests.HeartbeatResponse@4388ce00 for correlation
> id 1492702 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling
> request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1,
> clientId=kafka-python-1.4.6, correlationId=1492703) --
> {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6}
> from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
> (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response
> org.apache.kafka.common.requests.HeartbeatResponse@96e1b92 for correlation id
> 1492703 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling
> request:RequestHeader(apiKey=FETCH, apiVersion=7,
> clientId=broker-5-fetcher-0, correlationId=1788883) --
> {replica_id=5,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=357018451,epoch=1788882,topics=[{topic=__consumer_offsets,partitions=[{partition=48,fetch_offset=23135,log_start_offset=0,max_bytes=1048576}]}],forgetten_topics_data=[]}
> from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
> (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Handling
> request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1,
> clientId=kafka-python-1.4.6, correlationId=1492704) --
> {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6}
> from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
> (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,737] TRACE [KafkaApi-1] Sending heartbeat response
> org.apache.kafka.common.requests.HeartbeatResponse@6cb7676f for correlation
> id 1492704 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Handling
> request:RequestHeader(apiKey=HEARTBEAT, apiVersion=1,
> clientId=kafka-python-1.4.6, correlationId=1492705) --
> {group_id=wilson-tester,generation_id=20,member_id=kafka-python-1.4.6-4910b677-ecf2-4d1e-9fd0-a705aa37e0a6}
> from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
> (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Handling
> request:RequestHeader(apiKey=FETCH, apiVersion=7,
> clientId=broker-4-fetcher-0, correlationId=1788529) --
> {replica_id=4,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=1188767819,epoch=1788528,topics=[{topic=__consumer_offsets,partitions=[{partition=48,fetch_offset=23135,log_start_offset=0,max_bytes=1048576}]}],forgetten_topics_data=[]}
> from connection;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
> (kafka.server.KafkaApis)
> [2019-04-15 23:59:53,738] TRACE [KafkaApi-1] Sending heartbeat response
> org.apache.kafka.common.requests.HeartbeatResponse@5552d3ab for correlation
> id 1492705 to client kafka-python-1.4.6. (kafka.server.KafkaApis)
>
> // OffsetCommit Timed out
> [2019-04-15 23:59:58,737] DEBUG [KafkaApi-1] Offset commit request with
> correlation id 2114 from client kafka-python-1.4.6 on partition test-2 failed
> due to org.apache.kafka.common.errors.TimeoutException
> (kafka.server.KafkaApis)
> {code}
> h4. OffsetCommit, FetcherFollower, HeartBeat
> When a FetchFollower comes and updates the HighWaterMark, it supposes to
> complete the DelayedProduce and so the OffsetCommit request.
> However, it would only do it when it can obtain the GroupMetaDataLock (retry
> only once and immediately...)
> As a result, the DelayedProduce would only be checked next time when
> FetchFollower comes (and updates the HighWaterMark).
> Which usually means the next OffsetCommit, this explains why we observe
> OffsetCommit request timed out (high OffsetCommit latency) during low traffic
> time.
> ||OffsetCommit||FetcherFollower||HeartBeat||
> | replicaManager.appendRecords | |
> | → delayedProducePurgatory.tryCompleteElseWatch | |
> | → tryComplete() | |
> | → watchForOperation() | |
> | → operation.maybeTryComplete() | |
> | | partition.updateReplicaLogReadResult |
> | | → tryCompleteDelayedRequests |
> | | → delayedProducePurgatory.checkAndComplete |
> | | → watchers.tryCompleteWatched |
> | | → operation.maybeTryComplete() | group.lock
> | | → group.tryLock |
> | | → false |
> | | | group.unlock
> h2. 5) Solution
> We can have a separate executor to later retry completing DelayedOperation
> which failed to obtain lock
--
This message was sent by Atlassian Jira
(v8.3.4#803005)