[ https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681132#comment-16681132 ]
sachin commented on KAFKA-7280: ------------------------------- {code:java} {code} [~rsivaram] * I am on "1.1.0" version and following is the stack trace. Is this fatal error? Or upon subsequent poll() or commitSync(), will heartbeat thread restart?: {code:java} 2018-11-08 16:53:57,456 [kafka-coordinator-heartbeat-thread | ConsumerNginxRawCW] ERROR org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=ConsumerNginxRawCW] Heartbeat thread failed due to unexpected error java.util.ConcurrentModificationException at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711) at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:734) at org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362) at org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:423) at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:212) at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948) {code} * I have not enabled TRACE level logging. Following is the log4j2 config: * {code:java} <?xml version="1.0" encoding="UTF-8"?> <Configuration status="error" monitorInterval="60"> <ThresholdFilter level="trace" /> <Appenders> <RollingFile name="Appender" fileName="/var/log/worker-nginx-raw.log" filePattern="/var/log/worker-nginx-raw-%d{yyyy-MM-dd}-%i.log.gz"> <PatternLayout> <pattern>%d [%t] %-5level %logger{36} - %msg%n</pattern> </PatternLayout> <Policies> <SizeBasedTriggeringPolicy size="100 MB" /> <TimeBasedTriggeringPolicy interval="1" /> </Policies> </RollingFile> </Appenders> <Loggers> <Logger name="com.kafka-client" additivity="false"> <AppenderRef ref="Appender" level="info" /> </Logger> <Root level="all"> <AppenderRef ref="Appender" level="error" /> </Root> </Loggers> </Configuration> {code} * Can any workaround be done to avoid this problem? Like listen for above exception from heartbeat thread and wakeup the thread doing the actual poll/commit then spawn another thread that reinitiates eveything including subscribe(...) etc? > ConcurrentModificationException in FetchSessionHandler in heartbeat thread > -------------------------------------------------------------------------- > > Key: KAFKA-7280 > URL: https://issues.apache.org/jira/browse/KAFKA-7280 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 1.1.1, 2.0.0 > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Critical > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Request/response handling in FetchSessionHandler is not thread-safe. But we > are using it in Kafka consumer without any synchronization even though poll() > from heartbeat thread can process responses. Heartbeat thread holds the > coordinator lock while processing its poll and responses, making other > operations involving the group coordinator safe. We also need to lock > FetchSessionHandler for the operations that update or read > FetchSessionHandler#sessionPartitions. > This exception is from a system test run on trunk of > TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two: > {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, > groupId=group] Heartbeat thread failed due to unexpected error > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at > org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362) > at > org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996) > {quote} > > The logs just prior to the exception show that a partition was removed from > the session: > {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, > groupId=group] Skipping fetch for partition test_topic-1 because there is an > in-flight request to worker4:9095 (id: 3 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Completed receive from node 2 for FETCH with correlation id > 417, received > {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header= > Unknown macro: > \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null} > ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 > bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Added READ_UNCOMMITTED fetch request for partition > test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for > node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) > (org.apache.kafka.clients.FetchSessionHandler) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), > toForget=(test_topic-2), implied=(test_topic-0)) to broker worker3:9095 (id: > 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Sending FETCH > {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=109800960,epoch=237,topics=[],forgotten_topics_data=[ > Unknown macro: \{topic=test_topic,partitions=[2]} > ]} with correlation id 418 to node 2 (org.apache.kafka.clients.NetworkClient) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Skipping fetch for partition test_topic-2 because there is an > in-flight request to worker3:9095 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, > groupId=group] Heartbeat thread failed due to unexpected error > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > java.util.ConcurrentModificationException > {quote} > The sequence in the logs show > # FETCH response received > # FetchSessionHandler#sessionPartitions is updated (a partition is removed) > # New FETCH request is sent > # Heartbeat thread throws ConcurrentModificationException while iterating > over FetchSessionHandler#sessionPartitions > This could be because 1) and 4) were on the heartbeat thread and 2) and 3) on > the thread processing Consumer#poll(). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)