[ 
https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-7280:
----------------------------------
    Description: 
Request/response handling in FetchSessionHandler is not thread-safe. But we are 
using it in Kafka consumer without any synchronization even though poll() from 
heartbeat thread can process responses. Heartbeat thread holds the coordinator 
lock while processing its poll and responses, making other operations involving 
the group coordinator safe. We also need to lock FetchSessionHandler for the 
operations that update or read FetchSessionHandler#sessionPartitions.

This exception is from a system test run on trunk of 
TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two:
{quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
groupId=group] Heartbeat thread failed due to unexpected error 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
 java.util.ConcurrentModificationException
 at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
 at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
 at 
org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362)
 at 
org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424)
 at 
org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216)
 at 
org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996)
{quote}
 

The logs just prior to the exception show that a partition was removed from the 
session:
{quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, 
groupId=group] Skipping fetch for partition test_topic-1 because there is an 
in-flight request to worker4:9095 (id: 3 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
 [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
groupId=group] Completed receive from node 2 for FETCH with correlation id 417, 
received 
{throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header=
Unknown macro: 
\{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null}
,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 
bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient)
 [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
groupId=group] Added READ_UNCOMMITTED fetch request for partition test_topic-0 
at offset 189 to node worker3:9095 (id: 2 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
 [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for 
node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) 
(org.apache.kafka.clients.FetchSessionHandler)
 [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), 
toForget=(test_topic-2), implied=(test_topic-0)) to broker worker3:9095 (id: 2 
rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher)
 [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
groupId=group] Sending FETCH 
{replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=109800960,epoch=237,topics=[],forgotten_topics_data=[
Unknown macro: \{topic=test_topic,partitions=[2]}
]} with correlation id 418 to node 2 (org.apache.kafka.clients.NetworkClient)
 [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
groupId=group] Skipping fetch for partition test_topic-2 because there is an 
in-flight request to worker3:9095 (id: 2 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
 [2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
groupId=group] Heartbeat thread failed due to unexpected error 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
 java.util.ConcurrentModificationException
{quote}
The sequence in the logs show
 # FETCH response received
 # FetchSessionHandler#sessionPartitions is updated (a partition is removed)
 # New FETCH request is sent
 # Heartbeat thread throws ConcurrentModificationException while iterating over 
FetchSessionHandler#sessionPartitions

This could be because 1) and 4) were on the heartbeat thread and 2) and 3) on 
the thread processing Consumer#poll().

 

  was:
Request/response handling in FetchSessionHandler is not thread-safe. But we are 
using it in Kafka consumer without any synchronization even though poll() from 
heartbeat thread can process responses. Heartbeat thread holds the coordinator 
lock while processing its poll and responses, making other operations involving 
the group coordinator safe. We also need to lock FetchSessionHandler for the 
operations that update or read FetchSessionHandler#sessionPartitions.

This exception is from a system test run on trunk of 
TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two:
{quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
groupId=group] Heartbeat thread failed due to unexpected error 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
 java.util.ConcurrentModificationException
 at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
 at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
 at 
org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362)
 at 
org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424)
 at 
org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216)
 at 
org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996)
{quote}
 

The logs just prior to the exception show that a partition was removed from the 
session:
{quote}
[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, 
groupId=group] Skipping fetch for partition test_topic-1 because there is an 
in-flight request to worker4:9095 (id: 3 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
[2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
groupId=group] Completed receive from node 2 for FETCH with correlation id 417, 
received 
{throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header={partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=183,
 timestamp=1534054402327, key=0 bytes, value=3 bytes))]}]}]} 
(org.apache.kafka.clients.NetworkClient)
[2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
groupId=group] Added READ_UNCOMMITTED fetch request for partition test_topic-0 
at offset 189 to node worker3:9095 (id: 2 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
[2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for 
node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) 
(org.apache.kafka.clients.FetchSessionHandler)
[2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), 
toForget=(test_topic-2), implied=(test_topic-0)) to broker worker3:9095 (id: 2 
rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher)
[2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
groupId=group] Sending FETCH 
{replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=109800960,epoch=237,topics=[],forgotten_topics_data=[{topic=test_topic,partitions=[2]}]}
 with correlation id 418 to node 2 (org.apache.kafka.clients.NetworkClient)
[2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
groupId=group] Skipping fetch for partition test_topic-2 because there is an 
in-flight request to worker3:9095 (id: 2 rack: null) 
(org.apache.kafka.clients.consumer.internals.Fetcher)
[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
groupId=group] Heartbeat thread failed due to unexpected error 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
java.util.ConcurrentModificationException
{quote}

 


> ConcurrentModificationException in FetchSessionHandler in heartbeat thread
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-7280
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7280
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 1.1.1, 2.0.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Request/response handling in FetchSessionHandler is not thread-safe. But we 
> are using it in Kafka consumer without any synchronization even though poll() 
> from heartbeat thread can process responses. Heartbeat thread holds the 
> coordinator lock while processing its poll and responses, making other 
> operations involving the group coordinator safe. We also need to lock 
> FetchSessionHandler for the operations that update or read 
> FetchSessionHandler#sessionPartitions.
> This exception is from a system test run on trunk of 
> TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two:
> {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
> groupId=group] Heartbeat thread failed due to unexpected error 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  java.util.ConcurrentModificationException
>  at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
>  at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362)
>  at 
> org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996)
> {quote}
>  
> The logs just prior to the exception show that a partition was removed from 
> the session:
> {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Skipping fetch for partition test_topic-1 because there is an 
> in-flight request to worker4:9095 (id: 3 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Completed receive from node 2 for FETCH with correlation id 
> 417, received 
> {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header=
> Unknown macro: 
> \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null}
> ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 
> bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Added READ_UNCOMMITTED fetch request for partition 
> test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for 
> node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) 
> (org.apache.kafka.clients.FetchSessionHandler)
>  [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, 
> groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), 
> toForget=(test_topic-2), implied=(test_topic-0)) to broker worker3:9095 (id: 
> 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Sending FETCH 
> {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=109800960,epoch=237,topics=[],forgotten_topics_data=[
> Unknown macro: \{topic=test_topic,partitions=[2]}
> ]} with correlation id 418 to node 2 (org.apache.kafka.clients.NetworkClient)
>  [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, 
> groupId=group] Skipping fetch for partition test_topic-2 because there is an 
> in-flight request to worker3:9095 (id: 2 rack: null) 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>  [2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, 
> groupId=group] Heartbeat thread failed due to unexpected error 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  java.util.ConcurrentModificationException
> {quote}
> The sequence in the logs show
>  # FETCH response received
>  # FetchSessionHandler#sessionPartitions is updated (a partition is removed)
>  # New FETCH request is sent
>  # Heartbeat thread throws ConcurrentModificationException while iterating 
> over FetchSessionHandler#sessionPartitions
> This could be because 1) and 4) were on the heartbeat thread and 2) and 3) on 
> the thread processing Consumer#poll().
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to